rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.normal-topic: 用户自定义消息发送的topic
rocketmq.producer.endpoints=127.0.0.1:8081 rocketmq.producer.topic=normalTopic demo.rocketmq.normal-topic=normalTopic
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行normal消息的发送(消息的参数类型可选:Object、String、byte[]、Message)
@SpringBootApplication public class ClientProducerApplication implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class); @Value("${demo.rocketmq.normal-topic}") private String normalTopic; @Resource private RocketMQClientTemplate rocketMQClientTemplate; public static void main(String[] args) { SpringApplication.run(ClientProducerApplication.class, args); } @Override public void run(String... args) throws ClientException { testSendNormalMessage(); } //Test sending normal message void testSendNormalMessage() { SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3)); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message"); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8)); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder. withPayload("test message".getBytes()).build()); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); } @Data @AllArgsConstructor public class UserMeaasge implements Serializable { private int id; private String userName; private Byte userAge; } }
rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.fifo-topic: 用户自定义消息发送的topic
demo.rocketmq.message-group=group1: 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
rocketmq.producer.endpoints=127.0.0.1:8081 rocketmq.producer.topic=fifoTopic demo.rocketmq.fifo-topic=fifoTopic demo.rocketmq.message-group=group1
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行fifo消息的发送(参数类型可选:Object、String、byte[]、Message)
发送fifo消息时需要设置参数:消费者组(MessageGroup)
@SpringBootApplication public class ClientProducerApplication implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class); @Value("${demo.rocketmq.fifo-topic}") private String fifoTopic; @Value("${demo.rocketmq.message-group}") private String messageGroup; @Resource private RocketMQClientTemplate rocketMQClientTemplate; public static void main(String[] args) { SpringApplication.run(ClientProducerApplication.class, args); } @Override public void run(String... args) throws ClientException { testSendFIFOMessage(); } //Test sending fifo message void testSendFIFOMessage() { SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3), messageGroup); System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder. withPayload("test message".getBytes()).build(), messageGroup); System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup); System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup); System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt); } @Data @AllArgsConstructor public class UserMeaasge implements Serializable { private int id; private String userName; private Byte userAge; } }
rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic
rocketmq.producer.endpoints=127.0.0.1:8081 rocketmq.producer.topic=delayTopic demo.rocketmq.fifo-topic=delayTopic
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行delay消息的发送(消息的参数类型可选:Object、String、byte[]、Message)
发送delay消息时需要指定延迟时间:DeliveryTimestamp
@SpringBootApplication public class ClientProducerApplication implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class); @Value("${demo.rocketmq.delay-topic}") private String delayTopic; @Resource private RocketMQClientTemplate rocketMQClientTemplate; public static void main(String[] args) { SpringApplication.run(ClientProducerApplication.class, args); } @Override public void run(String... args) throws ClientException { testSendDelayMessage(); } //Test sending delay message void testSendDelayMessage() { SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10)); System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder. withPayload("test message".getBytes()).build(), Duration.ofSeconds(30)); System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message", Duration.ofSeconds(60)); System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(90)); System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt); } @Data @AllArgsConstructor public class UserMeaasge implements Serializable { int id; private String userName; private Byte userAge; } }
rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic
rocketmq.producer.endpoints=127.0.0.1:8081 rocketmq.producer.topic=transTopic demo.rocketmq.trans-topic=transTopic
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#sendMessageInTransaction方法进行事务消息的发送(消息的参数类型可选:Object、String、byte[]、Message)。
发送成功后会收到Pair类型的返回值,其左值代表返回值SendReceipt;右值代表Transaction,可以让用户根据本地事务处理结果的业务逻辑来决定commit还是rollback。
使用注解@RocketMQTransactionListener标记一个自定义类,该类必须实现RocketMQTransactionChecker接口,并重写TransactionResolution check(MessageView messageView)方法。
void testSendTransactionMessage() throws ClientException { Pair<SendReceipt, Transaction> pair; SendReceipt sendReceipt; try { pair = rocketMQClientTemplate.sendMessageInTransaction(transTopic, MessageBuilder. withPayload(new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3)).setHeader("OrderId", 1).build()); } catch (ClientException e) { throw new RuntimeException(e); } sendReceipt = pair.getSendReceipt(); System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt); Transaction transaction = pair.getTransaction(); // executed local transaction if (doLocalTransaction(1)) { transaction.commit(); } else { transaction.rollback(); } } @RocketMQTransactionListener static class TransactionListenerImpl implements RocketMQTransactionChecker { @Override public TransactionResolution check(MessageView messageView) { if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("Receive transactional message check, message={}", messageView); return TransactionResolution.COMMIT; } log.info("rollback transaction"); return TransactionResolution.ROLLBACK; } } boolean doLocalTransaction(int number) { log.info("execute local transaction"); return number > 0; }
rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic
rocketmq.producer.endpoints=127.0.0.1:8081 demo.rocketmq.fifo-topic=fifoTopic demo.rocketmq.delay-topic=delayTopic demo.rocketmq.normal-topic=normalTopic demo.rocketmq.message-group=group1
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
void testASycSendMessage() { CompletableFuture<SendReceipt> future0 = new CompletableFuture<>(); CompletableFuture<SendReceipt> future1 = new CompletableFuture<>(); CompletableFuture<SendReceipt> future2 = new CompletableFuture<>(); ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future0.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("Failed to send message", throwable); return; } log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); future1.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("Failed to send message", throwable); return; } log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); future2.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("Failed to send message", throwable); return; } log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); CompletableFuture<SendReceipt> completableFuture0 = rocketMQClientTemplate.asyncSendNormalMessage(normalTopic, new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3), future0); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, completableFuture0); CompletableFuture<SendReceipt> completableFuture1 = rocketMQClientTemplate.asyncSendFifoMessage(fifoTopic, "fifo message", messageGroup, future1); System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, completableFuture1); CompletableFuture<SendReceipt> completableFuture2 = rocketMQClientTemplate.asyncSendDelayMessage(delayTopic, "delay message".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10), future2); System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, completableFuture2); }
demo.fifo.rocketmq.endpoints=localhost:8081 demo.fifo.rocketmq.topic=fifoTopic demo.fifo.rocketmq.consumer-group=fifoGroup demo.fifo.rocketmq.tag=*
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
@Service @RocketMQMessageListener(endpoints = "${demo.fifo.rocketmq.endpoints:}", topic = "${demo.fifo.rocketmq.topic:}", consumerGroup = "${demo.fifo.rocketmq.consumer-group:}", tag = "${demo.fifo.rocketmq.tag:}") public class FifoConsumer implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { System.out.println("handle my fifo message:" + messageView); return ConsumeResult.SUCCESS; } }
rocketmq.simple-consumer.endpoints=localhost:8081 rocketmq.simple-consumer.consumer-group=normalGroup rocketmq.simple-consumer.topic=normalTopic rocketmq.simple-consumer.tag=* rocketmq.simple-consumer.filter-expression-type=tag ext.rocketmq.topic=delayTopic
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
此时测验原始的RocketMQClientTemplate和我们拓展的ExtRocketMQTemplate是否有效:
@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}") public class ExtRocketMQTemplate extends RocketMQClientTemplate { }
@SpringBootApplication public class ClientConsumeApplication implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class); @Resource RocketMQClientTemplate rocketMQClientTemplate; @Resource(name = "extRocketMQTemplate") RocketMQClientTemplate extRocketMQTemplate; public static void main(String[] args) { SpringApplication.run(ClientConsumeApplication.class, args); } @Override public void run(String... args) throws Exception { receiveSimpleConsumerMessage(); receiveExtSimpleConsumerMessage(); } public void receiveSimpleConsumerMessage() throws ClientException { do { final List<MessageView> messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15)); log.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId()); final MessageId messageId = message.getMessageId(); try { rocketMQClientTemplate.ack(message); log.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } } while (true); } public void receiveExtSimpleConsumerMessage() throws ClientException { do { final List<MessageView> messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15)); log.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId()); final MessageId messageId = message.getMessageId(); try { rocketMQClientTemplate.ack(message); log.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } } while (true); } }
rocketmq.simple-consumer.endpoints=localhost:8081 rocketmq.simple-consumer.consumer-group=normalGroup rocketmq.simple-consumer.topic=normalTopic rocketmq.simple-consumer.tag=* rocketmq.simple-consumer.filter-expression-type=tag
public void receiveSimpleConsumerMessageAsynchronously() { do { int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); // Set individual thread pool for receive callback. ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool(); // Set individual thread pool for ack callback. ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool(); CompletableFuture<List<MessageView>> future0; try { future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration); } catch (ClientException | IOException e) { throw new RuntimeException(e); } future0.whenCompleteAsync(((messages, throwable) -> { if (null != throwable) { log.error("Failed to receive message from remote", throwable); // Return early. return; } log.info("Received {} message(s)", messages.size()); // Using messageView as key rather than message id because message id may be duplicated. final Map<MessageView, CompletableFuture<Void>> map = messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync)); for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) { final MessageId messageId = entry.getKey().getMessageId(); final CompletableFuture<Void> future = entry.getValue(); future.whenCompleteAsync((v, t) -> { if (null != t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); // Return early. return; } log.info("Message is acknowledged successfully, messageId={}", messageId); }, ackCallbackExecutor); } }), receiveCallbackExecutor); } while (true); }
rocketmq.producer.endpoints=localhost:8081 rocketmq.producer.topic=normalTopic rocketmq.producer.access-key=yourAccessKey rocketmq.producer.secret-key=yourSecretKey
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
@SpringBootApplication public class ClientProducerACLApplication implements CommandLineRunner { @Resource private RocketMQClientTemplate rocketMQClientTemplate; @Value("${demo.acl.rocketmq.normal-topic}") private String normalTopic; public static void main(String[] args) { SpringApplication.run(ClientProducerACLApplication.class, args); } @Override public void run(String... args) throws ClientException { testSendNormalMessage(); } void testSendNormalMessage() { SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage() .setId(1).setUserName("name").setUserAge((byte) 3)); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message"); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8)); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder. withPayload("test message".getBytes()).build()); System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt); } }
demo.acl.rocketmq.endpoints=localhost:8081 demo.acl.rocketmq.topic=normalTopic demo.acl.rocketmq.consumer-group=normalGroup demo.acl.rocketmq.tag=* demo.acl.rocketmq.access-key=yourAccessKey demo.acl.rocketmq.secret-key=yourSecretKey
注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
@Service @RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", secretKey = "${demo.acl.rocketmq.secret-key:}", endpoints = "${demo.acl.rocketmq.endpoints:}", topic = "${demo.acl.rocketmq.topic:}", consumerGroup = "${demo.acl.rocketmq.consumer-group:}", tag = "${demo.acl.rocketmq.tag:}") public class ACLConsumer implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { System.out.println("handle my acl message:" + messageView); return ConsumeResult.SUCCESS; } }