blob: 089016a923f99ec7bcd20ad3ecf86b7f486a51b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.spring.core;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
private DefaultMQProducer producer;
private String charset = "UTF-8";
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
public DefaultMQProducer getProducer() {
return producer;
}
public void setProducer(DefaultMQProducer producer) {
this.producer = producer;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
public MessageQueueSelector getMessageQueueSelector() {
return messageQueueSelector;
}
public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
this.messageQueueSelector = messageQueueSelector;
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
return sendAndReceive(destination, message, type, null, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type) {
return sendAndReceive(destination, payload, type, null, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type of T
* @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
return sendAndReceive(destination, message, type, null, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type of T
* @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
return sendAndReceive(destination, payload, type, null, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type of T
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
return sendAndReceive(destination, message, type, null, timeout, delayLevel);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type of T
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
return sendAndReceive(destination, payload, type, null, timeout, delayLevel);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type of T
* @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
return sendAndReceive(destination, message, type, hashKey, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type of T
* @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
return sendAndReceive(destination, payload, type, hashKey, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type of T
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
return sendAndReceive(destination, message, type, hashKey, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type of T
* @param hashKey
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, long timeout) {
return sendAndReceive(destination, payload, type, hashKey, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param type The type that receive
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
MessageExt replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
} else {
replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
}
return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param type The type that receive
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return sendAndReceive(destination, message, type, hashKey, timeout, delayLevel);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, delayLevel);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, timeout, 0);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, delayLevel);
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, delayLevel);
}
/**
* Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
* <code>rocketMQLocalRequestCallback</code> will be executed. </p>
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @param hashKey needed when sending message orderly
* @param timeout send timeout in millis
* @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
if (timeout <= 0) {
timeout = producer.getSendMsgTimeout();
}
RequestCallback requestCallback = null;
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
}
@Override public void onException(Throwable e) {
rocketMQLocalRequestCallback.onException(e);
}
};
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
} else {
producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
}
} catch (
Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
* notification, SMS marketing system, etc.. </p>
* <p>
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
* messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
* duplication issue.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message) {
return syncSend(destination, message, producer.getSendMsgTimeout());
}
/**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout) {
return syncSend(destination, message, timeout, 0);
}
/**
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
* @param messages Collection of {@link org.springframework.messaging.Message}
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
if (Objects.isNull(messages) || messages.size() == 0) {
log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
throw new IllegalArgumentException("`messages` can not be empty");
}
try {
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
for (Message msg : messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
}
rmqMsgs.add(this.createRocketMqMessage(destination, msg));
}
SendResult sendResult = producer.send(rmqMsgs, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #syncSend(String, Message)}.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload) {
return syncSend(destination, payload, producer.getSendMsgTimeout());
}
/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSend(destination, message, timeout);
}
/**
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
}
/**
* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
}
/**
* Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendOrderly(destination, message, hashKey, timeout);
}
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.send(rocketMsg, sendCallback, timeout);
} catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
asyncSend(destination, message, sendCallback, timeout, 0);
}
/**
* <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
* sensitive business scenarios. </p>
* <p>
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* <p>
* Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
* DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
* message duplication and application developers are the one to resolve this potential issue.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
}
/**
* Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
asyncSend(destination, message, sendCallback, timeout);
}
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
}
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
}
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
}
/**
* Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
* @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
}
/**
* Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
* <p>
* One-way transmission is used for cases requiring moderate reliability, such as log collection.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
*/
public void sendOneWay(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("sendOneWay failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
} catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #sendOneWay(String, Message)}
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
sendOneWay(destination, message);
}
/**
* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param hashKey use this key to select queue. for example: orderId, productId ...
*/
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
} catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #sendOneWayOrderly(String, Message, String)}
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
Message<?> message = MessageBuilder.withPayload(payload).build();
sendOneWayOrderly(destination, message, hashKey);
}
@Override
public void afterPropertiesSet() throws Exception {
if (producer != null) {
producer.start();
}
}
@Override
protected void doSend(String destination, Message<?> message) {
SendResult sendResult = syncSend(destination, message);
if (log.isDebugEnabled()) {
log.debug("send message to `{}` finished. result:{}", destination, sendResult);
}
}
@Override
protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
Message<?> message = super.doConvert(payload, headers, postProcessor);
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
return builder.build();
}
@Override
public void destroy() {
if (Objects.nonNull(producer)) {
producer.shutdown();
}
}
/**
* Send Spring Message in Transaction
*
* @param destination destination formats: `topicName:tags`
* @param message message {@link org.springframework.messaging.Message}
* @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
if (((TransactionMQProducer) producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return producer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
private org.apache.rocketmq.common.message.Message createRocketMqMessage(
String destination, Message<?> message) {
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
destination, msg);
}
private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
} else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(type, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
}
} catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
}
}
}
private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}
Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
}