Merge pull request #152 from zkzlx/message
[ISSUE #147]An enhancement about the convert in RocketMQTemplate
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index e348e15..c513a5d 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
</modules>
<properties>
- <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
+ <rocketmq-spring-boot-starter-version>2.0.4-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
index 59ef263..3fa0b31 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
@@ -30,7 +30,7 @@
* MessageExtConsumer, consume listener impl class.
*/
@Service
-@RocketMQMessageListener(topic = "message-ext-topic", selectorExpression = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
+@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
index 677e0ea..04cb17f 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
@@ -31,6 +31,6 @@
@Override
public void onMessage(OrderPaidEvent orderPaidEvent) {
- System.out.printf("------- OrderPaidEventConsumer received: %s \n", orderPaidEvent);
+ System.out.printf("------- OrderPaidEventConsumer received: %s [orderId : %s]\n", orderPaidEvent,orderPaidEvent.getOrderId());
}
}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java
new file mode 100644
index 0000000..b9e3647
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
+public class UserConsumer implements RocketMQListener<User> {
+ @Override
+ public void onMessage(User message) {
+
+ System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
new file mode 100644
index 0000000..4f2579f
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class User {
+ private String userName;
+ private Byte userAge;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public User setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Byte getUserAge() {
+ return userAge;
+ }
+
+ public User setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "User{" +
+ "userName='" + userName + '\'' +
+ ", userAge=" + userAge +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 404cb10..583cde4 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -7,6 +7,7 @@
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
+demo.rocketmq.topic.user=user-topic
# another nameserver different global
demo.rocketmq.myNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 8ab42e1..8abc4ff 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -20,6 +20,7 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
+import org.apache.rocketmq.samples.springboot.domain.User;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@@ -30,8 +31,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.MimeTypeUtils;
import javax.annotation.Resource;
import java.math.BigDecimal;
@@ -52,6 +55,9 @@
private String springTransTopic;
@Value("${demo.rocketmq.topic}")
private String springTopic;
+ @Value("${demo.rocketmq.topic.user}")
+ private String userTopic;
+
@Value("${demo.rocketmq.orderTopic}")
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
@@ -69,8 +75,15 @@
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+ sendResult = rocketMQTemplate.syncSend(userTopic, new User().setUserAge((byte)18).setUserName("Kitty"));
+ System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
+
+ sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
+ new User().setUserAge((byte)21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
+ System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
+
// Use the extRocketMQTemplate
- sendResult = extRocketMQTemplate.syncSend(springTopic, "Hello, World!");
+ sendResult = extRocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World!2222".getBytes()).build());
System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
// Send string with spring Message
@@ -79,10 +92,12 @@
// Send user-defined object
rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
+ @Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
+ @Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
@@ -95,7 +110,6 @@
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
-
// Send a batch of strings
testBatchMessages();
@@ -107,7 +121,7 @@
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
- setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
+ setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
@@ -115,9 +129,8 @@
System.out.printf("--- Batch messages send result :" + sr);
}
-
private void testTransaction() throws MessagingException {
- String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+ String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
new file mode 100644
index 0000000..42acc11
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class User {
+ private String userName;
+ private Byte userAge;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public User setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Byte getUserAge() {
+ return userAge;
+ }
+
+ public User setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "User{" +
+ "userName='" + userName + '\'' +
+ ", userAge=" + userAge +
+ '}';
+ }
+ }
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 3a68505..27a3abc 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -7,5 +7,6 @@
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
+demo.rocketmq.topic.user=user-topic
demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 192bfc9..0e1e37e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -17,13 +17,16 @@
package org.apache.rocketmq.spring.autoconfigure;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+import java.util.Objects;
+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -38,9 +41,7 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
-import java.util.Map;
-import java.util.Objects;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
@@ -53,10 +54,12 @@
private RocketMQProperties rocketMQProperties;
private ObjectMapper objectMapper;
+ private RocketMQMessageConverter rocketMQMessageConverter;
public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
- StandardEnvironment environment,
- RocketMQProperties rocketMQProperties) {
+ RocketMQMessageConverter rocketMQMessageConverter,
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+ this.rocketMQMessageConverter = rocketMQMessageConverter;
this.objectMapper = rocketMQMessageObjectMapper;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
@@ -64,7 +67,7 @@
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}
@Override
@@ -84,7 +87,7 @@
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
@@ -94,13 +97,12 @@
mqProducer.start();
} catch (MQClientException e) {
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
- beanName), e);
+ beanName), e);
}
- RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
rocketMQTemplate.setProducer(mqProducer);
+ rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
rocketMQTemplate.setObjectMapper(objectMapper);
-
-
log.info("Set real producer to :{} {}", beanName, annotation.value());
}
@@ -124,7 +126,7 @@
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
- annotation.enableMsgTrace(), customizedTraceTopic);
+ annotation.enableMsgTrace(), customizedTraceTopic);
producer.setVipChannelEnabled(false);
} else {
producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
@@ -141,18 +143,19 @@
return producer;
}
- private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
+ private void validate(ExtRocketMQTemplateConfiguration annotation,
+ GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
- "please check the @ExtRocketMQTemplateConfiguration",
- annotation.value()));
+ "please check the @ExtRocketMQTemplateConfiguration",
+ annotation.value()));
}
if (rocketMQProperties.getNameServer() == null ||
- rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
+ rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
throw new BeanDefinitionValidationException(
- "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
- "global property, please use the default RocketMQTemplate!");
+ "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
+ "global property, please use the default RocketMQTemplate!");
}
}
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
index d25ca8d..b6518c0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
@@ -24,6 +24,7 @@
@Configuration
@ConditionalOnMissingBean(ObjectMapper.class)
+@Deprecated
class JacksonFallbackConfiguration {
@Bean
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 927bf44..6462604 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -17,13 +17,18 @@
package org.apache.rocketmq.spring.autoconfigure;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -38,11 +43,7 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
@@ -57,18 +58,20 @@
private RocketMQProperties rocketMQProperties;
private ObjectMapper objectMapper;
+ private RocketMQMessageConverter rocketMQMessageConverter;
public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
- StandardEnvironment environment,
- RocketMQProperties rocketMQProperties) {
+ RocketMQMessageConverter rocketMQMessageConverter,
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.objectMapper = rocketMQMessageObjectMapper;
+ this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}
@Override
@@ -106,7 +109,7 @@
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
@@ -124,7 +127,8 @@
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
- private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
+ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
+ RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
@@ -142,8 +146,10 @@
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
- container.setRocketMQListener((RocketMQListener) bean);
+ container.setRocketMQMessageListener(annotation);
+ container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
+ container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
return container;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
new file mode 100644
index 0000000..5f7e419
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @see RocketMQMessageConverter
+ */
+@Configuration
+@ConditionalOnMissingBean(RocketMQMessageConverter.class)
+class MessageConverterConfiguration {
+
+ @Bean
+ public RocketMQMessageConverter createRocketMQMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
+
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 5196369..dbe697b 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,7 +17,8 @@
package org.apache.rocketmq.spring.autoconfigure;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.annotation.PostConstruct;
+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
@@ -27,6 +28,7 @@
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +39,6 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -47,14 +48,14 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import javax.annotation.PostConstruct;
+import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
-@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
+@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
-@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class })
-@AutoConfigureAfter(JacksonAutoConfiguration.class)
+@Import({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
+@AutoConfigureAfter({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class})
public class RocketMQAutoConfiguration {
private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
@@ -70,7 +71,6 @@
}
}
-
@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
@@ -113,18 +113,21 @@
@Bean(destroyMethod = "destroy")
@ConditionalOnBean(DefaultMQProducer.class)
@ConditionalOnMissingBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
- public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) {
+ public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
+ ObjectMapper rocketMQMessageObjectMapper,
+ RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
+ rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
@Bean
@ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
- public TransactionHandlerRegistry transactionHandlerRegistry(@Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
- RocketMQTemplate template) {
+ public TransactionHandlerRegistry transactionHandlerRegistry(
+ @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
return new TransactionHandlerRegistry(template);
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
index 7307a31..0ec0161 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
@@ -38,15 +38,15 @@
listenerContainers.clear();
}
- public void registerTransactionHandler(TransactionHandler handler) throws MQClientException {
+ public void registerTransactionHandler(TransactionHandler handler)
+ throws MQClientException {
if (listenerContainers.contains(handler.getName())) {
- throw new MQClientException(-1,
- String
- .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(),
- handler.getBeanName()));
+ throw new MQClientException(-1, String.format(
+ "The transaction name [%s] has been defined in TransactionListener [%s]",
+ handler.getName(), handler.getBeanName()));
}
listenerContainers.add(handler.getName());
-
- rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
+ rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
+ handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
}
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 528615d..a2431cf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,8 +17,13 @@
package org.apache.rocketmq.spring.core;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -30,6 +35,8 @@
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
@@ -41,21 +48,15 @@
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
- private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
+ private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
private DefaultMQProducer producer;
+ @Deprecated
private ObjectMapper objectMapper;
private String charset = "UTF-8";
@@ -72,10 +73,12 @@
this.producer = producer;
}
+ @Deprecated
public ObjectMapper getObjectMapper() {
return objectMapper;
}
+ @Deprecated
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@@ -143,19 +146,19 @@
try {
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
- org.apache.rocketmq.common.message.Message rocketMsg;
- for (Message msg:messages) {
+ for (Message msg : messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
}
- rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);
- rmqMsgs.add(rocketMsg);
+ rmqMsgs.add(this.createRocketMqMessage(destination, msg));
}
SendResult sendResult = producer.send(rmqMsgs, timeout);
long costTime = System.currentTimeMillis() - now;
- log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ if (log.isDebugEnabled()) {
+ log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
return sendResult;
} catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
@@ -177,17 +180,17 @@
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
long now = System.currentTimeMillis();
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
- log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ if (log.isDebugEnabled()) {
+ log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
@@ -215,7 +218,7 @@
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
- Message<?> message = this.doConvert(payload, null, null);
+ Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSend(destination, message, timeout);
}
@@ -245,14 +248,14 @@
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
long now = System.currentTimeMillis();
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
- log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ if (log.isDebugEnabled()) {
+ log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
@@ -282,11 +285,13 @@
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
- Message<?> message = this.doConvert(payload, null, null);
+ Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendOrderly(destination, message, hashKey, timeout);
}
+
/**
- * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.
+ * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
+ * addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
@@ -294,15 +299,14 @@
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
*/
- public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
+ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
+ int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
@@ -312,6 +316,7 @@
throw new MessagingException(e.getMessage(), e);
}
}
+
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
@@ -321,12 +326,12 @@
* @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
- asyncSend(destination,message,sendCallback,timeout,0);
+ asyncSend(destination, message, sendCallback, timeout, 0);
}
/**
- * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
- * business scenarios. </p>
+ * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
+ * sensitive business scenarios. </p>
* <p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* <p>
@@ -351,7 +356,7 @@
* @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
- Message<?> message = this.doConvert(payload, null, null);
+ Message<?> message = MessageBuilder.withPayload(payload).build();
asyncSend(destination, message, sendCallback, timeout);
}
@@ -377,15 +382,13 @@
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
- long timeout) {
+ long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
@@ -427,8 +430,8 @@
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
- long timeout) {
- Message<?> message = this.doConvert(payload, null, null);
+ long timeout) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
}
@@ -446,10 +449,8 @@
log.error("sendOneWay failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
} catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
@@ -464,7 +465,7 @@
* @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
- Message<?> message = this.doConvert(payload, null, null);
+ Message<?> message = MessageBuilder.withPayload(payload).build();
sendOneWay(destination, message);
}
@@ -480,10 +481,8 @@
log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
-
try {
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
} catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
@@ -498,7 +497,7 @@
* @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
- Message<?> message = this.doConvert(payload, null, null);
+ Message<?> message = MessageBuilder.withPayload(payload).build();
sendOneWayOrderly(destination, message, hashKey);
}
@@ -512,37 +511,17 @@
@Override
protected void doSend(String destination, Message<?> message) {
SendResult sendResult = syncSend(destination, message);
- log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+ if (log.isDebugEnabled()) {
+ log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+ }
}
-
-
@Override
protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
- String content;
- if (payload instanceof String) {
- content = (String) payload;
- } else {
- // If payload not as string, use objectMapper change it.
- try {
- content = objectMapper.writeValueAsString(payload);
- } catch (JsonProcessingException e) {
- log.error("convert payload to String failed. payload:{}", payload);
- throw new RuntimeException("convert to payload to String failed.", e);
- }
- }
-
- MessageBuilder<?> builder = MessageBuilder.withPayload(content);
- if (headers != null) {
- builder.copyHeaders(headers);
- }
+ Message<?> message = super.doConvert(payload, headers, postProcessor);
+ MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
-
- Message<?> message = builder.build();
- if (postProcessor != null) {
- message = postProcessor.postProcessMessage(message);
- }
- return message;
+ return builder.build();
}
@Override
@@ -550,7 +529,6 @@
if (Objects.nonNull(producer)) {
producer.shutdown();
}
-
for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
if (Objects.nonNull(kv.getValue())) {
kv.getValue().shutdown();
@@ -585,11 +563,11 @@
* @return TransactionSendResult
* @throws MessagingException
*/
- public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
+ public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
+ final Message<?> message, final Object arg) throws MessagingException {
try {
TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
- org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
- charset, destination, message);
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return txProducer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
@@ -598,8 +576,8 @@
/**
* Remove a TransactionMQProducer from cache by manual.
- * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to directly
- * use this method by user.
+ * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to
+ * directly use this method by user.
*
* @param txProducerGroup
* @throws MessagingException
@@ -621,13 +599,13 @@
* @param txProducerGroup Producer (group) name, unique for each producer
* @param transactionListener TransactoinListener impl class
* @param executorService Nullable.
- * @param rpcHook Nullable.
+ * @param rpcHook Nullable.
* @return true if producer is created and started; false if the named producer already exists in cache.
* @throws MessagingException
*/
public boolean createAndStartTransactionMQProducer(String txProducerGroup,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
txProducerGroup = getTxProducerGroupName(txProducerGroup);
if (cache.containsKey(txProducerGroup)) {
log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
@@ -646,8 +624,8 @@
}
private TransactionMQProducer createTransactionMQProducer(String name,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) {
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) {
Assert.notNull(producer, "Property 'producer' is required");
Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
TransactionMQProducer txProducer;
@@ -675,4 +653,12 @@
return txProducer;
}
+
+ private org.apache.rocketmq.common.message.Message createRocketMqMessage(
+ String destination, Message<?> message) {
+ Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
+ return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
+ destination, msg);
+ }
+
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 497d94b..603ce66 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -17,7 +17,12 @@
package org.apache.rocketmq.spring.support;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Objects;
+
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -45,13 +50,11 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Objects;
+import com.fasterxml.jackson.databind.ObjectMapper;
@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
@@ -85,8 +88,11 @@
private String charset = "UTF-8";
+ @Deprecated
private ObjectMapper objectMapper;
+ private MessageConverter messageConverter;
+
private RocketMQListener rocketMQListener;
private RocketMQMessageListener rocketMQMessageListener;
@@ -164,14 +170,24 @@
this.charset = charset;
}
+ @Deprecated
public ObjectMapper getObjectMapper() {
return objectMapper;
}
+ @Deprecated
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
+ public MessageConverter getMessageConverter() {
+ return messageConverter;
+ }
+
+ public DefaultRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
+ this.messageConverter = messageConverter;
+ return this;
+ }
public RocketMQListener getRocketMQListener() {
return rocketMQListener;
@@ -380,7 +396,7 @@
} else {
// If msgType not string, use objectMapper change it.
try {
- return objectMapper.readValue(str, messageType);
+ return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
new file mode 100644
index 0000000..c2ee1da
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import org.springframework.messaging.converter.ByteArrayMessageConverter;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.StringMessageConverter;
+import org.springframework.util.ClassUtils;
+
+/**
+ * @see MessageConverter
+ * @see CompositeMessageConverter
+ */
+public class RocketMQMessageConverter {
+
+ private static final boolean JACKSON_PRESENT;
+ private static final boolean FASTJSON_PRESENT;
+
+ static {
+ ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
+ JACKSON_PRESENT =
+ ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
+ ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
+ FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) &&
+ ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader);
+ }
+
+ private final CompositeMessageConverter messageConverter;
+
+ public RocketMQMessageConverter() {
+ List<MessageConverter> messageConverters = new ArrayList<>();
+ ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
+ byteArrayMessageConverter.setContentTypeResolver(null);
+ messageConverters.add(byteArrayMessageConverter);
+ messageConverters.add(new StringMessageConverter());
+ if (JACKSON_PRESENT) {
+ messageConverters.add(new MappingJackson2MessageConverter());
+ }
+ if (FASTJSON_PRESENT) {
+ try {
+ messageConverters.add(
+ (MessageConverter)ClassUtils.forName(
+ "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
+ ClassUtils.getDefaultClassLoader()).newInstance());
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) {
+ //ignore this exception
+ }
+ }
+ messageConverter = new CompositeMessageConverter(messageConverters);
+ }
+
+ public MessageConverter getMessageConverter() {
+ return messageConverter;
+ }
+
+ public MessageConverter resetMessageConverter(
+ Collection<MessageConverter> converters) {
+ if (messageConverter.getConverters() != null) {
+ messageConverter.getConverters().clear();
+ }
+ Objects.requireNonNull(messageConverter.getConverters()).addAll(converters);
+ return messageConverter;
+ }
+
+ public MessageConverter addMessageConverter(MessageConverter converter) {
+ if (messageConverter.getConverters() != null && converter != null) {
+ messageConverter.getConverters().add(converter);
+ }
+ return messageConverter;
+ }
+
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index f4deb45..4c731f2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -34,6 +34,7 @@
import org.springframework.core.env.Environment;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -106,8 +107,8 @@
if (!CollectionUtils.isEmpty(properties)) {
properties.forEach((key, val) -> {
if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
- && !MessageHeaders.TIMESTAMP.equals(key) &&
- (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
+ && !MessageHeaders.TIMESTAMP.equals(key) &&
+ (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
messageBuilder.setHeader(key, val);
}
});
@@ -127,6 +128,7 @@
return messageBuilder.build();
}
+ @Deprecated
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
ObjectMapper objectMapper, String charset,
String destination, org.springframework.messaging.Message message) {
@@ -134,9 +136,9 @@
byte[] payloads;
if (payloadObj instanceof String) {
- payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
+ payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
- payloads = (byte[]) message.getPayload();
+ payloads = (byte[])message.getPayload();
} else {
try {
String jsonObj = objectMapper.writeValueAsString(payloadObj);
@@ -145,37 +147,46 @@
throw new RuntimeException("convert to RocketMQ message failed.", e);
}
}
+ return getAndWrapMessage(destination, message.getHeaders(), payloads);
+ }
+ public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
+ String destination, org.springframework.messaging.Message<byte[]> message) {
+ return getAndWrapMessage(destination, message.getHeaders(), message.getPayload());
+ }
+
+ private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
+ if (destination == null || destination.length() < 1) {
+ return null;
+ }
+ if (payloads == null || payloads.length < 1) {
+ return null;
+ }
String[] tempArr = destination.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
-
- org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads);
-
- MessageHeaders headers = message.getHeaders();
+ Message rocketMsg = new Message(topic, tags, payloads);
if (Objects.nonNull(headers) && !headers.isEmpty()) {
Object keys = headers.get(RocketMQHeaders.KEYS);
if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
rocketMsg.setKeys(keys.toString());
}
-
Object flagObj = headers.getOrDefault("FLAG", "0");
int flag = 0;
try {
flag = Integer.parseInt(flagObj.toString());
} catch (NumberFormatException e) {
// Ignore it
- log.info("flag must be integer, flagObj:{}", flagObj);
+ if (log.isInfoEnabled()) {
+ log.info("flag must be integer, flagObj:{}", flagObj);
+ }
}
rocketMsg.setFlag(flag);
-
Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
- boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
- rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
-
+ rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
headers.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
&& !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
@@ -186,10 +197,37 @@
});
}
-
return rocketMsg;
}
+ public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
+ MessageConverter messageConverter, String charset,
+ String destination, org.springframework.messaging.Message<?> message) {
+ Object payloadObj = message.getPayload();
+ byte[] payloads;
+ try {
+ if (null == payloadObj) {
+ throw new RuntimeException("the message cannot be empty");
+ }
+ if (payloadObj instanceof String) {
+ payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+ } else if (payloadObj instanceof byte[]) {
+ payloads = (byte[])message.getPayload();
+ } else {
+ String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
+ if (null == jsonObj) {
+ throw new RuntimeException(String.format(
+ "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+ messageConverter.getClass(), payloadObj.getClass(), payloadObj));
+ }
+ payloads = jsonObj.getBytes(Charset.forName(charset));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("convert to RocketMQ message failed.", e);
+ }
+ return getAndWrapMessage(destination, message.getHeaders(), payloads);
+ }
+
public static RPCHook getRPCHookByAkSk(Environment env, String accessKeyOrExpr, String secretKeyOrExpr) {
String ak, sk;
try {
@@ -209,7 +247,7 @@
public static String getInstanceName(RPCHook rpcHook, String identify) {
String separator = "|";
StringBuilder instanceName = new StringBuilder();
- SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
+ SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
instanceName.append(sessionCredentials.getAccessKey())
.append(separator).append(sessionCredentials.getSecretKey())
.append(separator).append(identify)
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 8f2a5ab..b4c7437 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -29,6 +29,7 @@
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
@@ -117,8 +118,8 @@
withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
run((context) -> {
assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
- DefaultRocketMQListenerContainer.class).getObjectMapper())
- .isSameAs(context.getBean(CustomObjectMapperConfig.class).testObjectMapper());
+ DefaultRocketMQListenerContainer.class).getMessageConverter())
+ .isSameAs(context.getBean(CustomObjectMapperConfig.class).rocketMQMessageConverter().getMessageConverter());
});
}
@@ -128,8 +129,8 @@
withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
run((context) -> {
assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
- DefaultRocketMQListenerContainer.class).getObjectMapper())
- .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageObjectMapper());
+ DefaultRocketMQListenerContainer.class).getMessageConverter())
+ .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageConverter().getMessageConverter());
});
}
@@ -246,6 +247,11 @@
public ObjectMapper testObjectMapper() {
return new ObjectMapper();
}
+ @Bean
+ public RocketMQMessageConverter rocketMQMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
+
}
@@ -261,6 +267,10 @@
public ObjectMapper rocketMQMessageObjectMapper() {
return new ObjectMapper();
}
+ @Bean
+ public RocketMQMessageConverter rocketMQMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
}