rocketmq-format-style
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 45d9320..8bf061b 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -43,7 +43,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
-
@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
@@ -57,18 +56,18 @@
private ObjectMapper objectMapper;
private RocketMQMessageConverter rocketMQMessageConverter;
- public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
- RocketMQMessageConverter rocketMQMessageConverter,
- StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
- this.rocketMQMessageConverter = rocketMQMessageConverter;
- this.objectMapper = rocketMQMessageObjectMapper;
- this.environment = environment;
- this.rocketMQProperties = rocketMQProperties;
- }
+ public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
+ RocketMQMessageConverter rocketMQMessageConverter,
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+ this.rocketMQMessageConverter = rocketMQMessageConverter;
+ this.objectMapper = rocketMQMessageObjectMapper;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}
@Override
@@ -88,7 +87,7 @@
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
@@ -96,11 +95,12 @@
mqProducer.setInstanceName(beanName);
try {
mqProducer.start();
- } catch (MQClientException e) {
- throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
- beanName), e);
}
- RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+ catch (MQClientException e) {
+ throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
+ beanName), e);
+ }
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
rocketMQTemplate.setObjectMapper(objectMapper);
@@ -127,9 +127,10 @@
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
- annotation.enableMsgTrace(), customizedTraceTopic);
+ annotation.enableMsgTrace(), customizedTraceTopic);
producer.setVipChannelEnabled(false);
- } else {
+ }
+ else {
producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
}
@@ -144,18 +145,19 @@
return producer;
}
- private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
+ private void validate(ExtRocketMQTemplateConfiguration annotation,
+ GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
- "please check the @ExtRocketMQTemplateConfiguration",
- annotation.value()));
+ "please check the @ExtRocketMQTemplateConfiguration",
+ annotation.value()));
}
if (rocketMQProperties.getNameServer() == null ||
- rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
+ rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
throw new BeanDefinitionValidationException(
- "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
- "global property, please use the default RocketMQTemplate!");
+ "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
+ "global property, please use the default RocketMQTemplate!");
}
}
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 99a9877..82d90d2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -45,7 +45,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
-
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
@@ -61,18 +60,18 @@
private ObjectMapper objectMapper;
private RocketMQMessageConverter rocketMQMessageConverter;
- public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
- RocketMQMessageConverter rocketMQMessageConverter,
- StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
- this.objectMapper = rocketMQMessageObjectMapper;
- this.rocketMQMessageConverter = rocketMQMessageConverter;
- this.environment = environment;
- this.rocketMQProperties = rocketMQProperties;
- }
+ public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
+ RocketMQMessageConverter rocketMQMessageConverter,
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+ this.objectMapper = rocketMQMessageObjectMapper;
+ this.rocketMQMessageConverter = rocketMQMessageConverter;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}
@Override
@@ -110,7 +109,7 @@
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
@@ -119,7 +118,8 @@
if (!container.isRunning()) {
try {
container.start();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
@@ -128,7 +128,8 @@
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
- private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
+ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
+ RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
@@ -141,7 +142,7 @@
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
- container.setRocketMQListener((RocketMQListener) bean);
+ container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
index f8bf7b6..5f7e419 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
@@ -23,18 +23,15 @@
import org.springframework.context.annotation.Configuration;
/**
- *
* @see RocketMQMessageConverter
- *
- * @author zkz
*/
@Configuration
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
class MessageConverterConfiguration {
- @Bean
- public RocketMQMessageConverter createRocketMQMessageConverter() {
- return new RocketMQMessageConverter();
- }
+ @Bean
+ public RocketMQMessageConverter createRocketMQMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 41246e6..91b2609 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -52,10 +52,10 @@
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
-@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class})
+@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
-@Import({ JacksonFallbackConfiguration.class,MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class })
-@AutoConfigureAfter({JacksonFallbackConfiguration.class,MessageConverterConfiguration.class})
+@Import({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
+@AutoConfigureAfter({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class})
public class RocketMQAutoConfiguration {
private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
@@ -71,7 +71,6 @@
}
}
-
@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
@@ -92,7 +91,8 @@
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
- } else {
+ }
+ else {
producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
@@ -114,23 +114,23 @@
@Bean(destroyMethod = "destroy")
@ConditionalOnBean(DefaultMQProducer.class)
@ConditionalOnMissingBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
- public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
- ObjectMapper rocketMQMessageObjectMapper,
- RocketMQMessageConverter rocketMQMessageConverter) {
- RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
- rocketMQTemplate.setProducer(mqProducer);
- rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
- rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
- return rocketMQTemplate;
- }
+ public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
+ ObjectMapper rocketMQMessageObjectMapper,
+ RocketMQMessageConverter rocketMQMessageConverter) {
+ RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
+ rocketMQTemplate.setProducer(mqProducer);
+ rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
+ rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
+ return rocketMQTemplate;
+ }
@Bean
@ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
- public TransactionHandlerRegistry transactionHandlerRegistry(
- @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
- return new TransactionHandlerRegistry(template);
- }
+ public TransactionHandlerRegistry transactionHandlerRegistry(
+ @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
+ return new TransactionHandlerRegistry(template);
+ }
@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnBean(TransactionHandlerRegistry.class)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
index 1809874..0ec0161 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
@@ -38,15 +38,15 @@
listenerContainers.clear();
}
- public void registerTransactionHandler(TransactionHandler handler)
- throws MQClientException {
- if (listenerContainers.contains(handler.getName())) {
- throw new MQClientException(-1, String.format(
- "The transaction name [%s] has been defined in TransactionListener [%s]",
- handler.getName(), handler.getBeanName()));
- }
- listenerContainers.add(handler.getName());
- rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
- handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
- }
+ public void registerTransactionHandler(TransactionHandler handler)
+ throws MQClientException {
+ if (listenerContainers.contains(handler.getName())) {
+ throw new MQClientException(-1, String.format(
+ "The transaction name [%s] has been defined in TransactionListener [%s]",
+ handler.getName(), handler.getBeanName()));
+ }
+ listenerContainers.add(handler.getName());
+ rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
+ handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
+ }
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 64c4343..51f442a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -52,7 +52,7 @@
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
- private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
+ private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
private DefaultMQProducer producer;
@@ -72,10 +72,12 @@
public void setProducer(DefaultMQProducer producer) {
this.producer = producer;
}
+
@Deprecated
public ObjectMapper getObjectMapper() {
return objectMapper;
}
+
@Deprecated
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
@@ -97,7 +99,6 @@
this.messageQueueSelector = messageQueueSelector;
}
-
/**
* <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
@@ -145,7 +146,7 @@
try {
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
- for (Message<?> msg:messages) {
+ for (Message<?> msg : messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
@@ -155,11 +156,12 @@
SendResult sendResult = producer.send(rmqMsgs, timeout);
long costTime = System.currentTimeMillis() - now;
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
@@ -187,11 +189,12 @@
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -252,11 +255,12 @@
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -287,8 +291,10 @@
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendOrderly(destination, message, hashKey, timeout);
}
+
/**
- * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.
+ * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
+ * addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
@@ -296,7 +302,8 @@
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
*/
- public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
+ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
+ int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
@@ -307,11 +314,13 @@
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.send(rocketMsg, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
+
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
@@ -321,11 +330,12 @@
* @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
- asyncSend(destination,message,sendCallback,timeout,0);
+ asyncSend(destination, message, sendCallback, timeout, 0);
}
/**
- * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
+ * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
+ * sensitive
* business scenarios. </p>
* <p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
@@ -377,7 +387,7 @@
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
- long timeout) {
+ long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
@@ -385,7 +395,8 @@
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -425,7 +436,7 @@
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
- long timeout) {
+ long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
}
@@ -447,7 +458,8 @@
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -479,7 +491,8 @@
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -506,13 +519,11 @@
@Override
protected void doSend(String destination, Message<?> message) {
SendResult sendResult = syncSend(destination, message);
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("send message to `{}` finished. result:{}", destination, sendResult);
}
}
-
-
@Override
protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
Message<?> message = super.doConvert(payload, headers, postProcessor);
@@ -560,19 +571,22 @@
* @return TransactionSendResult
* @throws MessagingException
*/
- public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
+ public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
+ final Message<?> message, final Object arg) throws MessagingException {
try {
TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return txProducer.sendMessageInTransaction(rocketMsg, arg);
- } catch (MQClientException e) {
+ }
+ catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
/**
* Remove a TransactionMQProducer from cache by manual.
- * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to directly
+ * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to
+ * directly
* use this method by user.
*
* @param txProducerGroup
@@ -595,13 +609,13 @@
* @param txProducerGroup Producer (group) name, unique for each producer
* @param transactionListener TransactoinListener impl class
* @param executorService Nullable.
- * @param rpcHook Nullable.
+ * @param rpcHook Nullable.
* @return true if producer is created and started; false if the named producer already exists in cache.
* @throws MessagingException
*/
public boolean createAndStartTransactionMQProducer(String txProducerGroup,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
txProducerGroup = getTxProducerGroupName(txProducerGroup);
if (cache.containsKey(txProducerGroup)) {
log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
@@ -612,7 +626,8 @@
try {
txProducer.start();
cache.put(txProducerGroup, txProducer);
- } catch (MQClientException e) {
+ }
+ catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
@@ -620,8 +635,8 @@
}
private TransactionMQProducer createTransactionMQProducer(String name,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) {
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) {
Assert.notNull(producer, "Property 'producer' is required");
Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
TransactionMQProducer txProducer;
@@ -629,7 +644,8 @@
txProducer = new TransactionMQProducer(name, rpcHook);
txProducer.setVipChannelEnabled(false);
txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
- } else {
+ }
+ else {
txProducer = new TransactionMQProducer(name);
}
txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
@@ -648,13 +664,12 @@
return txProducer;
}
-
-
- private org.apache.rocketmq.common.message.Message createRocketMqMessage(
- String destination, Message<?> message) {
- Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
- return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
- destination, msg);
- }
-
+
+ private org.apache.rocketmq.common.message.Message createRocketMqMessage(
+ String destination, Message<?> message) {
+ Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
+ return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
+ destination, msg);
+ }
+
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
index 24c765b..f8dbee9 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
@@ -29,70 +29,66 @@
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.ClassUtils;
-
/**
* @see MessageConverter
* @see CompositeMessageConverter
- *
- * @author zkz
*/
public class RocketMQMessageConverter {
+ private static final boolean JACKSON_PRESENT;
+ private static final boolean FASTJSON_PRESENT;
- private static final boolean jackson2Present;
- private static final boolean fastJsonPresent;
+ static {
+ ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
+ JACKSON_PRESENT =
+ ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
+ ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
+ FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) &&
+ ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader);
+ }
- static {
- ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
- jackson2Present =
- ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
- ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
- fastJsonPresent = ClassUtils.isPresent("com.alibaba.fastjson.JSON",classLoader)&&
- ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig",classLoader);
- }
+ private final CompositeMessageConverter messageConverter;
+ public RocketMQMessageConverter() {
+ List<MessageConverter> messageConverters = new ArrayList<>();
+ ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
+ byteArrayMessageConverter.setContentTypeResolver(null);
+ messageConverters.add(byteArrayMessageConverter);
+ messageConverters.add(new StringMessageConverter());
+ if (JACKSON_PRESENT) {
+ messageConverters.add(new MappingJackson2MessageConverter());
+ }
+ if (FASTJSON_PRESENT) {
+ try {
+ messageConverters.add(
+ (MessageConverter)ClassUtils.forName(
+ "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
+ ClassUtils.getDefaultClassLoader()).newInstance());
+ }
+ catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ }
+ }
+ messageConverter = new CompositeMessageConverter(messageConverters);
+ }
+ public MessageConverter getMessageConverter() {
+ return messageConverter;
+ }
- private final CompositeMessageConverter messageConverter;
+ public MessageConverter resetMessageConverter(
+ Collection<MessageConverter> converters) {
+ if (messageConverter.getConverters() != null) {
+ messageConverter.getConverters().clear();
+ }
+ Objects.requireNonNull(messageConverter.getConverters()).addAll(converters);
+ return messageConverter;
+ }
- public RocketMQMessageConverter() {
- List<MessageConverter> messageConverters = new ArrayList<>();
- ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
- byteArrayMessageConverter.setContentTypeResolver(null);
- messageConverters.add(byteArrayMessageConverter);
- messageConverters.add(new StringMessageConverter());
- if(jackson2Present) {
- messageConverters.add(new MappingJackson2MessageConverter());
- }
- if(fastJsonPresent){
- try {
- messageConverters.add((MessageConverter)
- ClassUtils.forName("com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter"
- ,ClassUtils.getDefaultClassLoader()).newInstance());
- } catch (ClassNotFoundException |IllegalAccessException |InstantiationException e) {
- }
- }
- messageConverter = new CompositeMessageConverter(messageConverters);
- }
-
- public MessageConverter getMessageConverter() {
- return messageConverter;
- }
-
- public MessageConverter resetMessageConverter(
- Collection<MessageConverter> converters) {
- if (messageConverter.getConverters() != null) {
- messageConverter.getConverters().clear();
- }
- Objects.requireNonNull(messageConverter.getConverters()).addAll(converters);
- return messageConverter;
- }
-
- public MessageConverter addMessageConverter(MessageConverter converter) {
- if (messageConverter.getConverters() != null && converter != null) {
- messageConverter.getConverters().add(converter);
- }
- return messageConverter;
- }
+ public MessageConverter addMessageConverter(MessageConverter converter) {
+ if (messageConverter.getConverters() != null && converter != null) {
+ messageConverter.getConverters().add(converter);
+ }
+ return messageConverter;
+ }
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index fb4d2b3..720486d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -107,8 +107,8 @@
if (!CollectionUtils.isEmpty(properties)) {
properties.forEach((key, val) -> {
if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
- && !MessageHeaders.TIMESTAMP.equals(key) &&
- (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
+ && !MessageHeaders.TIMESTAMP.equals(key) &&
+ (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
messageBuilder.setHeader(key, val);
}
});
@@ -136,14 +136,17 @@
byte[] payloads;
if (payloadObj instanceof String) {
- payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
- } else if (payloadObj instanceof byte[]) {
- payloads = (byte[]) message.getPayload();
- } else {
+ payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+ }
+ else if (payloadObj instanceof byte[]) {
+ payloads = (byte[])message.getPayload();
+ }
+ else {
try {
String jsonObj = objectMapper.writeValueAsString(payloadObj);
payloads = jsonObj.getBytes(Charset.forName(charset));
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new RuntimeException("convert to RocketMQ message failed.", e);
}
}
@@ -151,14 +154,15 @@
}
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
- String destination, org.springframework.messaging.Message<byte[]> message){
- return getAndWrapMessage(destination,message.getHeaders(),message.getPayload());
+ String destination, org.springframework.messaging.Message<byte[]> message) {
+ return getAndWrapMessage(destination, message.getHeaders(), message.getPayload());
}
+
private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
- if(destination==null || destination.length()<1){
+ if (destination == null || destination.length() < 1) {
return null;
}
- if(payloads == null || payloads.length<1){
+ if (payloads == null || payloads.length < 1) {
return null;
}
String[] tempArr = destination.split(":", 2);
@@ -177,9 +181,10 @@
int flag = 0;
try {
flag = Integer.parseInt(flagObj.toString());
- } catch (NumberFormatException e) {
+ }
+ catch (NumberFormatException e) {
// Ignore it
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info("flag must be integer, flagObj:{}", flagObj);
}
}
@@ -187,58 +192,56 @@
Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
headers.entrySet().stream()
- .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
- && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
- .forEach(entry -> {
- if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
- rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
- }
- });
+ .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
+ && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
+ .forEach(entry -> {
+ if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
+ rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ });
}
return rocketMsg;
}
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
- MessageConverter messageConverter, String charset,
- String destination, org.springframework.messaging.Message<?> message) {
+ MessageConverter messageConverter, String charset,
+ String destination, org.springframework.messaging.Message<?> message) {
Object payloadObj = message.getPayload();
byte[] payloads;
try {
- if(null == payloadObj){
+ if (null == payloadObj) {
throw new RuntimeException("the message cannot be empty");
}
if (payloadObj instanceof String) {
- payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
- } else if (payloadObj instanceof byte[]) {
- payloads = (byte[]) message.getPayload();
- } else {
- String jsonObj = (String) messageConverter.fromMessage(message, payloadObj.getClass());
+ payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+ }
+ else if (payloadObj instanceof byte[]) {
+ payloads = (byte[])message.getPayload();
+ }
+ else {
+ String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
- "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
- messageConverter.getClass(), payloadObj.getClass(), payloadObj));
+ "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+ messageConverter.getClass(), payloadObj.getClass(), payloadObj));
}
payloads = jsonObj.getBytes(Charset.forName(charset));
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new RuntimeException("convert to RocketMQ message failed.", e);
}
return getAndWrapMessage(destination, message.getHeaders(), payloads);
}
-
-
-
-
-
-
public static RPCHook getRPCHookByAkSk(Environment env, String accessKeyOrExpr, String secretKeyOrExpr) {
String ak, sk;
try {
ak = env.resolveRequiredPlaceholders(accessKeyOrExpr);
sk = env.resolveRequiredPlaceholders(secretKeyOrExpr);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
// Ignore it
ak = null;
sk = null;
@@ -252,7 +255,7 @@
public static String getInstanceName(RPCHook rpcHook, String identify) {
String separator = "|";
StringBuilder instanceName = new StringBuilder();
- SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
+ SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
instanceName.append(sessionCredentials.getAccessKey())
.append(separator).append(sessionCredentials.getSecretKey())
.append(separator).append(identify)