blob: 046f3a338ed171976c86d495da58891a18248688 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.core;
import org.apache.rocketmq.client.common.Pair;
import org.apache.rocketmq.client.support.RocketMQMessageConverter;
import org.apache.rocketmq.client.support.RocketMQUtil;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.support.MessageBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQClientTemplate extends AbstractMessageSendingTemplate<String> implements DisposableBean {
private static final Logger log = LoggerFactory.getLogger(RocketMQClientTemplate.class);
private ProducerBuilder producerBuilder;
private SimpleConsumerBuilder simpleConsumerBuilder;
private volatile Producer producer;
private volatile SimpleConsumer simpleConsumer;
private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
private String charset = "UTF-8";
public Producer getProducer() {
if (Objects.isNull(producer)) {
synchronized (RocketMQClientTemplate.class) {
if (Objects.isNull(producer)) {
try {
this.producer = producerBuilder.build();
} catch (ClientException e) {
throw new RuntimeException(e);
}
}
}
}
return producer;
}
public void setProducer(Producer producer) {
this.producer = producer;
}
public SimpleConsumer getSimpleConsumer() {
if (Objects.isNull(simpleConsumer)) {
synchronized (RocketMQClientTemplate.class) {
if (Objects.isNull(simpleConsumer)) {
try {
this.simpleConsumer = simpleConsumerBuilder.build();
} catch (ClientException e) {
throw new RuntimeException(e);
}
}
}
}
return simpleConsumer;
}
public void setSimpleConsumer(SimpleConsumer simpleConsumer) {
this.simpleConsumer = simpleConsumer;
}
public ProducerBuilder getProducerBuilder() {
return producerBuilder;
}
public void setProducerBuilder(ProducerBuilder producerBuilder) {
this.producerBuilder = producerBuilder;
}
public SimpleConsumerBuilder getSimpleConsumerBuilder() {
return simpleConsumerBuilder;
}
public void setSimpleConsumerBuilder(SimpleConsumerBuilder simpleConsumerBuilder) {
this.simpleConsumerBuilder = simpleConsumerBuilder;
}
public RocketMQMessageConverter getRocketMQMessageConverter() {
return rocketMQMessageConverter;
}
public void setRocketMQMessageConverter(RocketMQMessageConverter rocketMQMessageConverter) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
@Override
public void destroy() throws Exception {
if (Objects.nonNull(producer)) {
producer.close();
}
if (Objects.nonNull(simpleConsumer)) {
simpleConsumer.close();
}
}
@Override
protected void doSend(String destination, Message<?> message) {
SendReceipt sendReceipt = syncSendGrpcMessage(destination, message, null, null);
if (log.isDebugEnabled()) {
log.debug("send message to `{}` finished. result:{}", destination, sendReceipt);
}
}
/**
* @param destination formats: `topicName:tags`
* @param payload the payload to be sent
* @param messageDelayTime Time for message delay
* @return SendReceipt Synchronous Task Results
*/
public SendReceipt syncSendDelayMessage(String destination, Object payload, Duration messageDelayTime) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, messageDelayTime, null);
}
public SendReceipt syncSendDelayMessage(String destination, String payload, Duration messageDelayTime) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, messageDelayTime, null);
}
public SendReceipt syncSendDelayMessage(String destination, byte[] payload, Duration messageDelayTime) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, messageDelayTime, null);
}
public SendReceipt syncSendDelayMessage(String destination, Message<?> message, Duration messageDelayTime) {
return syncSendGrpcMessage(destination, message, messageDelayTime, null);
}
public SendReceipt syncSendFifoMessage(String destination, Object payload, String messageGroup) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, messageGroup);
}
public SendReceipt syncSendFifoMessage(String destination, String payload, String messageGroup) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, messageGroup);
}
public SendReceipt syncSendFifoMessage(String destination, byte[] payload, String messageGroup) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, messageGroup);
}
public SendReceipt syncSendFifoMessage(String destination, Message<?> message, String messageGroup) {
return syncSendGrpcMessage(destination, message, null, messageGroup);
}
public SendReceipt syncSendNormalMessage(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, null);
}
public SendReceipt syncSendNormalMessage(String destination, String payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, null);
}
public SendReceipt syncSendNormalMessage(String destination, Message<?> message) {
return syncSendGrpcMessage(destination, message, null, null);
}
public SendReceipt syncSendNormalMessage(String destination, byte[] payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return syncSendGrpcMessage(destination, message, null, null);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link Message} the message to be sent.
* @param messageDelayTime Time for message delay
* @param messageGroup message group name
* @return SendReceipt Synchronous Task Results
*/
public SendReceipt syncSendGrpcMessage(String destination, Message<?> message, Duration messageDelayTime, String messageGroup) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
SendReceipt sendReceipt = null;
try {
org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, messageDelayTime, messageGroup);
Producer grpcProducer = this.getProducer();
try {
sendReceipt = grpcProducer.send(rocketMsg);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
return sendReceipt;
}
public CompletableFuture<SendReceipt> asyncSendWithObjectPayload(String destination, Object payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return asyncSend(destination, message, messageDelayTime, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendWithStringPayload(String destination, String payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return asyncSend(destination, message, messageDelayTime, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendWithBytePayload(String destination, byte[] payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return asyncSend(destination, message, messageDelayTime, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendWithMessagePayload(String destination, Message<?> payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
return asyncSend(destination, payload, messageDelayTime, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, Object payload, CompletableFuture<SendReceipt> future) {
return asyncSendWithObjectPayload(destination, payload, null, null, future);
}
public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, String payload, CompletableFuture<SendReceipt> future) {
return asyncSendWithStringPayload(destination, payload, null, null, future);
}
public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, byte[] payload, CompletableFuture<SendReceipt> future) {
return asyncSendWithBytePayload(destination, payload, null, null, future);
}
public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, Message<?> payload, CompletableFuture<SendReceipt> future) {
return asyncSendWithMessagePayload(destination, payload, null, null, future);
}
public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, Object payload, String messageGroup, CompletableFuture<SendReceipt> future) {
return asyncSendWithObjectPayload(destination, payload, null, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, String payload, String messageGroup, CompletableFuture<SendReceipt> future) {
return asyncSendWithStringPayload(destination, payload, null, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, byte[] payload, String messageGroup, CompletableFuture<SendReceipt> future) {
return asyncSendWithBytePayload(destination, payload, null, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, Message<?> payload, String messageGroup, CompletableFuture<SendReceipt> future) {
return asyncSendWithMessagePayload(destination, payload, null, messageGroup, future);
}
public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, Object payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
return asyncSendWithObjectPayload(destination, payload, messageDelayTime, null, future);
}
public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, String payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
return asyncSendWithStringPayload(destination, payload, messageDelayTime, null, future);
}
public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, byte[] payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
return asyncSendWithBytePayload(destination, payload, messageDelayTime, null, future);
}
public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, Message<?> payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
return asyncSendWithMessagePayload(destination, payload, messageDelayTime, null, future);
}
public CompletableFuture<SendReceipt> asyncSend(String destination, Message<?> message, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
Producer grpcProducer = this.getProducer();
try {
org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, messageDelayTime, messageGroup);
future = grpcProducer.sendAsync(rocketMsg);
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
return future;
}
public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, Object payload) throws ClientException {
Message<?> message = MessageBuilder.withPayload(payload).build();
return sendTransactionMessage(destination, message);
}
public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, String payload) throws ClientException {
Message<?> message = MessageBuilder.withPayload(payload).build();
return sendTransactionMessage(destination, message);
}
public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, byte[] payload) throws ClientException {
Message<?> message = MessageBuilder.withPayload(payload).build();
return sendTransactionMessage(destination, message);
}
/**
* @param destination formats: `topicName:tags`
* @param message {@link Message} the message to be sent.
* @return CompletableFuture<SendReceipt> Asynchronous Task Results
*/
public Pair<SendReceipt, Transaction> sendTransactionMessage(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
final SendReceipt sendReceipt;
Producer grpcProducer = this.getProducer();
org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, null, null);
final Transaction transaction;
try {
transaction = grpcProducer.beginTransaction();
sendReceipt = grpcProducer.send(rocketMsg, transaction);
log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new RuntimeException(e);
}
return new Pair<>(sendReceipt, transaction);
}
public List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException {
SimpleConsumer simpleConsumer = this.getSimpleConsumer();
return simpleConsumer.receive(maxMessageNum, invisibleDuration);
}
public CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException, IOException {
SimpleConsumer simpleConsumer = this.getSimpleConsumer();
CompletableFuture<List<MessageView>> listCompletableFuture = simpleConsumer.receiveAsync(maxMessageNum, invisibleDuration);
simpleConsumer.close();
return listCompletableFuture;
}
public void ack(MessageView message) throws ClientException {
SimpleConsumer simpleConsumer = this.getSimpleConsumer();
simpleConsumer.ack(message);
}
public CompletableFuture<Void> ackAsync(MessageView messageView) {
SimpleConsumer simpleConsumer = this.getSimpleConsumer();
return simpleConsumer.ackAsync(messageView);
}
private org.apache.rocketmq.client.apis.message.Message createRocketMQMessage(String destination, Message<?> message, Duration messageDelayTime, String messageGroup) {
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
return RocketMQUtil.convertToClientMessage(getMessageConverter(), charset,
destination, msg, messageDelayTime, messageGroup);
}
}