blob: 59698fcf548832b8c4657552040323eebcc86037 [file] [log] [blame] [view]
<a name="qk9D5"></a>
# Normal消息发送
<a name="FRI7l"></a>
### 修改application.properties
**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。<br />**demo.rocketmq.normal-topic:** 用户自定义消息发送的topic
```properties
rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=normalTopic
demo.rocketmq.normal-topic=normalTopic
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="BykT5"></a>
### 编写代码
通过@Value注解引入配置文件参数,指定自定义topic<br />通过@Resource注解引入RocketMQClientTemplate容器<br />通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行normal消息的发送(消息的参数类型可选:Object、String、byte[]、Message)
```java
@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
@Value("${demo.rocketmq.normal-topic}")
private String normalTopic;
@Resource
private RocketMQClientTemplate rocketMQClientTemplate;
public static void main(String[] args) {
SpringApplication.run(ClientProducerApplication.class, args);
}
@Override
public void run(String... args) throws ClientException {
testSendNormalMessage();
}
//Test sending normal message
void testSendNormalMessage() {
SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3));
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
withPayload("test message".getBytes()).build());
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
}
@Data
@AllArgsConstructor
public class UserMeaasge implements Serializable {
private int id;
private String userName;
private Byte userAge;
}
}
```
<a name="Rl1D7"></a>
# FIFO消息发送
<a name="XnRuP"></a>
### 修改application.properties
**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。<br />**demo.rocketmq.fifo-topic:** 用户自定义消息发送的topic<br />**demo.rocketmq.message-group=group1:** 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
```properties
rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=fifoTopic
demo.rocketmq.fifo-topic=fifoTopic
demo.rocketmq.message-group=group1
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="QSR1T"></a>
### 编写代码
通过@Value注解引入配置文件参数,指定自定义topic<br />通过@Resource注解引入RocketMQClientTemplate容器<br />通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行fifo消息的发送(参数类型可选:Object、String、byte[]、Message)<br />发送fifo消息时需要设置参数:消费者组(MessageGroup)
```java
@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
@Value("${demo.rocketmq.fifo-topic}")
private String fifoTopic;
@Value("${demo.rocketmq.message-group}")
private String messageGroup;
@Resource
private RocketMQClientTemplate rocketMQClientTemplate;
public static void main(String[] args) {
SpringApplication.run(ClientProducerApplication.class, args);
}
@Override
public void run(String... args) throws ClientException {
testSendFIFOMessage();
}
//Test sending fifo message
void testSendFIFOMessage() {
SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3), messageGroup);
System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder.
withPayload("test message".getBytes()).build(), messageGroup);
System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup);
System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup);
System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
}
@Data
@AllArgsConstructor
public class UserMeaasge implements Serializable {
private int id;
private String userName;
private Byte userAge;
}
}
```
<a name="hn3Wn"></a>
# Delay消息发送
<a name="GvUOb"></a>
### 修改application.properties
**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。<br />**demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
```class
rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=delayTopic
demo.rocketmq.fifo-topic=delayTopic
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="QY1y9"></a>
### 编写代码
通过@Value注解引入配置文件参数,指定自定义topic<br />通过@Resource注解引入RocketMQClientTemplate容器<br />通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行delay消息的发送(消息的参数类型可选:Object、String、byte[]、Message)<br />发送delay消息时需要指定延迟时间:DeliveryTimestamp
```java
@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
@Value("${demo.rocketmq.delay-topic}")
private String delayTopic;
@Resource
private RocketMQClientTemplate rocketMQClientTemplate;
public static void main(String[] args) {
SpringApplication.run(ClientProducerApplication.class, args);
}
@Override
public void run(String... args) throws ClientException {
testSendDelayMessage();
}
//Test sending delay message
void testSendDelayMessage() {
SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10));
System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder.
withPayload("test message".getBytes()).build(), Duration.ofSeconds(30));
System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message",
Duration.ofSeconds(60));
System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8),
Duration.ofSeconds(90));
System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
}
@Data
@AllArgsConstructor
public class UserMeaasge implements Serializable {
int id;
private String userName;
private Byte userAge;
}
}
```
<a name="znYRu"></a>
# 事务消息发送
<a name="PXCrp"></a>
### 修改application.properties
**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。<br />**demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
```class
rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=transTopic
demo.rocketmq.trans-topic=transTopic
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="LAdLL"></a>
### 编写代码
通过@Value注解引入配置文件参数,指定自定义topic<br />通过@Resource注解引入RocketMQClientTemplate容器<br />通过调用**RocketMQClientTemplate#sendMessageInTransaction**方法进行事务消息的发送(消息的参数类型可选:Object、String、byte[]、Message)。<br />发送成功后会收到Pair类型的返回值,其左值代表返回值SendReceipt;右值代表Transaction,可以让用户根据本地事务处理结果的业务逻辑来决定commit还是rollback。<br />使用注解@RocketMQTransactionListener标记一个自定义类,该类必须实现RocketMQTransactionChecker接口,并重写TransactionResolution check(MessageView messageView)方法。
```class
void testSendTransactionMessage() throws ClientException {
Pair<SendReceipt, Transaction> pair;
SendReceipt sendReceipt;
try {
pair = rocketMQClientTemplate.sendMessageInTransaction(transTopic, MessageBuilder.
withPayload(new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3)).setHeader("OrderId", 1).build());
} catch (ClientException e) {
throw new RuntimeException(e);
}
sendReceipt = pair.getSendReceipt();
System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt);
Transaction transaction = pair.getTransaction();
// executed local transaction
if (doLocalTransaction(1)) {
transaction.commit();
} else {
transaction.rollback();
}
}
@RocketMQTransactionListener
static class TransactionListenerImpl implements RocketMQTransactionChecker {
@Override
public TransactionResolution check(MessageView messageView) {
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("Receive transactional message check, message={}", messageView);
return TransactionResolution.COMMIT;
}
log.info("rollback transaction");
return TransactionResolution.ROLLBACK;
}
}
boolean doLocalTransaction(int number) {
log.info("execute local transaction");
return number > 0;
}
```
<a name="PKExg"></a>
# 异步消息发送
<a name="sEJj9"></a>
### 修改application.properties
**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。<br />**demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
```class
rocketmq.producer.endpoints=127.0.0.1:8081
demo.rocketmq.fifo-topic=fifoTopic
demo.rocketmq.delay-topic=delayTopic
demo.rocketmq.normal-topic=normalTopic
demo.rocketmq.message-group=group1
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="r3tyX"></a>
### 编写代码
```class
void testASycSendMessage() {
CompletableFuture<SendReceipt> future0 = new CompletableFuture<>();
CompletableFuture<SendReceipt> future1 = new CompletableFuture<>();
CompletableFuture<SendReceipt> future2 = new CompletableFuture<>();
ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
future0.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
log.error("Failed to send message", throwable);
return;
}
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
}, sendCallbackExecutor);
future1.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
log.error("Failed to send message", throwable);
return;
}
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
}, sendCallbackExecutor);
future2.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
log.error("Failed to send message", throwable);
return;
}
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
}, sendCallbackExecutor);
CompletableFuture<SendReceipt> completableFuture0 = rocketMQClientTemplate.asyncSendNormalMessage(normalTopic, new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3), future0);
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, completableFuture0);
CompletableFuture<SendReceipt> completableFuture1 = rocketMQClientTemplate.asyncSendFifoMessage(fifoTopic, "fifo message",
messageGroup, future1);
System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, completableFuture1);
CompletableFuture<SendReceipt> completableFuture2 = rocketMQClientTemplate.asyncSendDelayMessage(delayTopic,
"delay message".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10), future2);
System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, completableFuture2);
}
```
<a name="o8qgh"></a>
# 接收消息
<a name="H2Yct"></a>
### Push 模式
<a name="FUGKu"></a>
#### 修改application.properties
```class
demo.fifo.rocketmq.endpoints=localhost:8081
demo.fifo.rocketmq.topic=fifoTopic
demo.fifo.rocketmq.consumer-group=fifoGroup
demo.fifo.rocketmq.tag=*
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="BENa4"></a>
#### 编写代码
```java
@Service
@RocketMQMessageListener(endpoints = "${demo.fifo.rocketmq.endpoints:}", topic = "${demo.fifo.rocketmq.topic:}",
consumerGroup = "${demo.fifo.rocketmq.consumer-group:}", tag = "${demo.fifo.rocketmq.tag:}")
public class FifoConsumer implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println("handle my fifo message:" + messageView);
return ConsumeResult.SUCCESS;
}
}
```
<a name="a0aDC"></a>
### Simple 模式
<a name="s9R23"></a>
#### 同步订阅
<a name="QGLUI"></a>
##### 修改application.properties
```class
rocketmq.simple-consumer.endpoints=localhost:8081
rocketmq.simple-consumer.consumer-group=normalGroup
rocketmq.simple-consumer.topic=normalTopic
rocketmq.simple-consumer.tag=*
rocketmq.simple-consumer.filter-expression-type=tag
ext.rocketmq.topic=delayTopic
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
<a name="azBFZ"></a>
##### 编写代码
此时测验原始的RocketMQClientTemplate和我们拓展的ExtRocketMQTemplate是否有效:
1. 首先定义拓展ExtRocketMQTemplate,需要加上@ExtConsumerResetConfiguration,并指定topic等关键字段。
```java
@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}")
public class ExtRocketMQTemplate extends RocketMQClientTemplate {
}
```
2. receiveSimpleConsumerMessage方法消费topic=normalTopic的消息,receiveExtSimpleConsumerMessage方法消费topic=delayTopic的消息。
```java
@SpringBootApplication
public class ClientConsumeApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class);
@Resource
RocketMQClientTemplate rocketMQClientTemplate;
@Resource(name = "extRocketMQTemplate")
RocketMQClientTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ClientConsumeApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
receiveSimpleConsumerMessage();
receiveExtSimpleConsumerMessage();
}
public void receiveSimpleConsumerMessage() throws ClientException {
do {
final List<MessageView> messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15));
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
final MessageId messageId = message.getMessageId();
try {
rocketMQClientTemplate.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
}
public void receiveExtSimpleConsumerMessage() throws ClientException {
do {
final List<MessageView> messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15));
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
final MessageId messageId = message.getMessageId();
try {
rocketMQClientTemplate.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
}
}
```
<a name="aW9hN"></a>
#### 异步订阅
<a name="jBBak"></a>
##### 修改application.properties
```class
rocketmq.simple-consumer.endpoints=localhost:8081
rocketmq.simple-consumer.consumer-group=normalGroup
rocketmq.simple-consumer.topic=normalTopic
rocketmq.simple-consumer.tag=*
rocketmq.simple-consumer.filter-expression-type=tag
```
<a name="mRlwB"></a>
##### 编写代码
```class
public void receiveSimpleConsumerMessageAsynchronously() {
do {
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Set individual thread pool for receive callback.
ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
// Set individual thread pool for ack callback.
ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
CompletableFuture<List<MessageView>> future0;
try {
future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration);
} catch (ClientException | IOException e) {
throw new RuntimeException(e);
}
future0.whenCompleteAsync(((messages, throwable) -> {
if (null != throwable) {
log.error("Failed to receive message from remote", throwable);
// Return early.
return;
}
log.info("Received {} message(s)", messages.size());
// Using messageView as key rather than message id because message id may be duplicated.
final Map<MessageView, CompletableFuture<Void>> map =
messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync));
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
future.whenCompleteAsync((v, t) -> {
if (null != t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
// Return early.
return;
}
log.info("Message is acknowledged successfully, messageId={}", messageId);
}, ackCallbackExecutor);
}
}), receiveCallbackExecutor);
} while (true);
}
```
<a name="GSP33"></a>
# ACL功能
<a name="PavXQ"></a>
## Producer端
<a name="k36vr"></a>
### 修改application.properties
```class
rocketmq.producer.endpoints=localhost:8081
rocketmq.producer.topic=normalTopic
rocketmq.producer.access-key=yourAccessKey
rocketmq.producer.secret-key=yourSecretKey
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
<a name="LE6va"></a>
### 编写代码
```java
@SpringBootApplication
public class ClientProducerACLApplication implements CommandLineRunner {
@Resource
private RocketMQClientTemplate rocketMQClientTemplate;
@Value("${demo.acl.rocketmq.normal-topic}")
private String normalTopic;
public static void main(String[] args) {
SpringApplication.run(ClientProducerACLApplication.class, args);
}
@Override
public void run(String... args) throws ClientException {
testSendNormalMessage();
}
void testSendNormalMessage() {
SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
.setId(1).setUserName("name").setUserAge((byte) 3));
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
withPayload("test message".getBytes()).build());
System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
}
}
```
<a name="tqzsy"></a>
## Consumer端
<a name="BkI2m"></a>
### 修改application.properties
```class
demo.acl.rocketmq.endpoints=localhost:8081
demo.acl.rocketmq.topic=normalTopic
demo.acl.rocketmq.consumer-group=normalGroup
demo.acl.rocketmq.tag=*
demo.acl.rocketmq.access-key=yourAccessKey
demo.acl.rocketmq.secret-key=yourSecretKey
```
> 注意:
> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
<a name="yiQdM"></a>
### 编写代码
```java
@Service
@RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", secretKey = "${demo.acl.rocketmq.secret-key:}", endpoints = "${demo.acl.rocketmq.endpoints:}", topic = "${demo.acl.rocketmq.topic:}",
consumerGroup = "${demo.acl.rocketmq.consumer-group:}", tag = "${demo.acl.rocketmq.tag:}")
public class ACLConsumer implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println("handle my acl message:" + messageView);
return ConsumeResult.SUCCESS;
}
}
```