Normal消息发送

修改application.properties

rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.normal-topic: 用户自定义消息发送的topic

rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=normalTopic
demo.rocketmq.normal-topic=normalTopic

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行normal消息的发送(消息的参数类型可选:Object、String、byte[]、Message)

@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);

    @Value("${demo.rocketmq.normal-topic}")
    private String normalTopic;
    
    @Resource
    private RocketMQClientTemplate rocketMQClientTemplate;
    
    public static void main(String[] args) {
        SpringApplication.run(ClientProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws ClientException {
        testSendNormalMessage();
    }

    //Test sending normal message
    void testSendNormalMessage() {
        SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
                .setId(1).setUserName("name").setUserAge((byte) 3));
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
                withPayload("test message".getBytes()).build());
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
    }

    @Data
    @AllArgsConstructor
    public class UserMeaasge implements Serializable {
        private int id;
        private String userName;
        private Byte userAge;
    }

}

FIFO消息发送

修改application.properties

rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.fifo-topic: 用户自定义消息发送的topic
demo.rocketmq.message-group=group1: 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=fifoTopic
demo.rocketmq.fifo-topic=fifoTopic
demo.rocketmq.message-group=group1

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行fifo消息的发送(参数类型可选:Object、String、byte[]、Message)
发送fifo消息时需要设置参数:消费者组(MessageGroup)

@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);

    @Value("${demo.rocketmq.fifo-topic}")
    private String fifoTopic;
    
    @Value("${demo.rocketmq.message-group}")
    private String messageGroup;

    @Resource
    private RocketMQClientTemplate rocketMQClientTemplate;
    
    public static void main(String[] args) {
        SpringApplication.run(ClientProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws ClientException {
        testSendFIFOMessage();
    }

    //Test sending fifo message
    void testSendFIFOMessage() {
        SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage()
                .setId(1).setUserName("name").setUserAge((byte) 3), messageGroup);
        System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder.
                withPayload("test message".getBytes()).build(), messageGroup);
        System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup);
        System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup);
        System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
    }

    @Data
    @AllArgsConstructor
    public class UserMeaasge implements Serializable {
        private int id;
        private String userName;
        private Byte userAge;
    }
    
}

Delay消息发送

修改application.properties

rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic

rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=delayTopic
demo.rocketmq.fifo-topic=delayTopic

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#syncSendNormalMessage方法进行delay消息的发送(消息的参数类型可选:Object、String、byte[]、Message)
发送delay消息时需要指定延迟时间:DeliveryTimestamp

@SpringBootApplication
public class ClientProducerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);

    @Value("${demo.rocketmq.delay-topic}")
    private String delayTopic;
    
    @Resource
    private RocketMQClientTemplate rocketMQClientTemplate;
    public static void main(String[] args) {
        SpringApplication.run(ClientProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws ClientException {
        testSendDelayMessage();
    }

    //Test sending delay message
    void testSendDelayMessage() {
        
        SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage()
                .setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10));
        System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder.
                withPayload("test message".getBytes()).build(), Duration.ofSeconds(30));
        System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message",
                Duration.ofSeconds(60));
        System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8),
                Duration.ofSeconds(90));
        System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
    }

    @Data
    @AllArgsConstructor
    public class UserMeaasge implements Serializable {
        int id;
        private String userName;
        private Byte userAge;
    }

}

事务消息发送

修改application.properties

rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic

rocketmq.producer.endpoints=127.0.0.1:8081
rocketmq.producer.topic=transTopic
demo.rocketmq.trans-topic=transTopic

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

通过@Value注解引入配置文件参数,指定自定义topic
通过@Resource注解引入RocketMQClientTemplate容器
通过调用RocketMQClientTemplate#sendMessageInTransaction方法进行事务消息的发送(消息的参数类型可选:Object、String、byte[]、Message)。
发送成功后会收到Pair类型的返回值,其左值代表返回值SendReceipt;右值代表Transaction,可以让用户根据本地事务处理结果的业务逻辑来决定commit还是rollback。
使用注解@RocketMQTransactionListener标记一个自定义类,该类必须实现RocketMQTransactionChecker接口,并重写TransactionResolution check(MessageView messageView)方法。

    void testSendTransactionMessage() throws ClientException {
        Pair<SendReceipt, Transaction> pair;
        SendReceipt sendReceipt;
        try {
            pair = rocketMQClientTemplate.sendMessageInTransaction(transTopic, MessageBuilder.
                    withPayload(new UserMessage()
                            .setId(1).setUserName("name").setUserAge((byte) 3)).setHeader("OrderId", 1).build());
        } catch (ClientException e) {
            throw new RuntimeException(e);
        }
        sendReceipt = pair.getSendReceipt();
        System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt);
        Transaction transaction = pair.getTransaction();
        // executed local transaction
        if (doLocalTransaction(1)) {
            transaction.commit();
        } else {
            transaction.rollback();
        }
    }

    @RocketMQTransactionListener
    static class TransactionListenerImpl implements RocketMQTransactionChecker {
        @Override
        public TransactionResolution check(MessageView messageView) {
            if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
                log.info("Receive transactional message check, message={}", messageView);
                return TransactionResolution.COMMIT;
            }
            log.info("rollback transaction");
            return TransactionResolution.ROLLBACK;
        }
    }

    boolean doLocalTransaction(int number) {
        log.info("execute local transaction");
        return number > 0;
    }

异步消息发送

修改application.properties

rocketmq.producer.topic: 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前预取topic路由。
demo.rocketmq.delay-topic: 用户自定义消息发送的topic

rocketmq.producer.endpoints=127.0.0.1:8081
demo.rocketmq.fifo-topic=fifoTopic
demo.rocketmq.delay-topic=delayTopic
demo.rocketmq.normal-topic=normalTopic
demo.rocketmq.message-group=group1

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

    void testASycSendMessage() {

        CompletableFuture<SendReceipt> future0 = new CompletableFuture<>();
        CompletableFuture<SendReceipt> future1 = new CompletableFuture<>();
        CompletableFuture<SendReceipt> future2 = new CompletableFuture<>();
        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();

        future0.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);

        future1.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);

        future2.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);

        CompletableFuture<SendReceipt> completableFuture0 = rocketMQClientTemplate.asyncSendNormalMessage(normalTopic, new UserMessage()
                .setId(1).setUserName("name").setUserAge((byte) 3), future0);
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, completableFuture0);

        CompletableFuture<SendReceipt> completableFuture1 = rocketMQClientTemplate.asyncSendFifoMessage(fifoTopic, "fifo message",
                messageGroup, future1);
        System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, completableFuture1);

        CompletableFuture<SendReceipt> completableFuture2 = rocketMQClientTemplate.asyncSendDelayMessage(delayTopic,
                "delay message".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10), future2);
        System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, completableFuture2);
    }

接收消息

Push 模式

修改application.properties

demo.fifo.rocketmq.endpoints=localhost:8081
demo.fifo.rocketmq.topic=fifoTopic
demo.fifo.rocketmq.consumer-group=fifoGroup
demo.fifo.rocketmq.tag=*

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

@Service
@RocketMQMessageListener(endpoints = "${demo.fifo.rocketmq.endpoints:}", topic = "${demo.fifo.rocketmq.topic:}",
        consumerGroup = "${demo.fifo.rocketmq.consumer-group:}", tag = "${demo.fifo.rocketmq.tag:}")
public class FifoConsumer implements RocketMQListener {

    @Override
    public ConsumeResult consume(MessageView messageView) {
        System.out.println("handle my fifo message:" + messageView);
        return ConsumeResult.SUCCESS;
    }
}

Simple 模式

同步订阅

修改application.properties
rocketmq.simple-consumer.endpoints=localhost:8081
rocketmq.simple-consumer.consumer-group=normalGroup
rocketmq.simple-consumer.topic=normalTopic
rocketmq.simple-consumer.tag=*
rocketmq.simple-consumer.filter-expression-type=tag
ext.rocketmq.topic=delayTopic

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口

编写代码

此时测验原始的RocketMQClientTemplate和我们拓展的ExtRocketMQTemplate是否有效:

  1. 首先定义拓展ExtRocketMQTemplate,需要加上@ExtConsumerResetConfiguration,并指定topic等关键字段。
@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}")
public class ExtRocketMQTemplate extends RocketMQClientTemplate {
}
  1. receiveSimpleConsumerMessage方法消费topic=normalTopic的消息,receiveExtSimpleConsumerMessage方法消费topic=delayTopic的消息。
@SpringBootApplication
public class ClientConsumeApplication implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class);

    @Resource
    RocketMQClientTemplate rocketMQClientTemplate;

    @Resource(name = "extRocketMQTemplate")
    RocketMQClientTemplate extRocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ClientConsumeApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        receiveSimpleConsumerMessage();
        receiveExtSimpleConsumerMessage();
    }

    public void receiveSimpleConsumerMessage() throws ClientException {
        do {
            final List<MessageView> messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15));
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
                final MessageId messageId = message.getMessageId();
                try {
                    rocketMQClientTemplate.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        } while (true);
    }

    public void receiveExtSimpleConsumerMessage() throws ClientException {
        do {
            final List<MessageView> messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15));
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
                final MessageId messageId = message.getMessageId();
                try {
                    rocketMQClientTemplate.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        } while (true);
    }

}

异步订阅

修改application.properties
rocketmq.simple-consumer.endpoints=localhost:8081
rocketmq.simple-consumer.consumer-group=normalGroup
rocketmq.simple-consumer.topic=normalTopic
rocketmq.simple-consumer.tag=*
rocketmq.simple-consumer.filter-expression-type=tag

编写代码
    public void receiveSimpleConsumerMessageAsynchronously() {
        do {
            int maxMessageNum = 16;
            // Set message invisible duration after it is received.
            Duration invisibleDuration = Duration.ofSeconds(15);
            // Set individual thread pool for receive callback.
            ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
            // Set individual thread pool for ack callback.
            ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
            CompletableFuture<List<MessageView>> future0;
            try {
                future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration);
            } catch (ClientException | IOException e) {
                throw new RuntimeException(e);
            }
            future0.whenCompleteAsync(((messages, throwable) -> {
                if (null != throwable) {
                    log.error("Failed to receive message from remote", throwable);
                    // Return early.
                    return;
                }
                log.info("Received {} message(s)", messages.size());
                // Using messageView as key rather than message id because message id may be duplicated.
                final Map<MessageView, CompletableFuture<Void>> map =
                        messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync));
                for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
                    final MessageId messageId = entry.getKey().getMessageId();
                    final CompletableFuture<Void> future = entry.getValue();
                    future.whenCompleteAsync((v, t) -> {
                        if (null != t) {
                            log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                            // Return early.
                            return;
                        }
                        log.info("Message is acknowledged successfully, messageId={}", messageId);
                    }, ackCallbackExecutor);
                }

            }), receiveCallbackExecutor);
        } while (true);
    }

ACL功能

Producer端

修改application.properties

rocketmq.producer.endpoints=localhost:8081
rocketmq.producer.topic=normalTopic
rocketmq.producer.access-key=yourAccessKey
rocketmq.producer.secret-key=yourSecretKey

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据

编写代码

@SpringBootApplication
public class ClientProducerACLApplication implements CommandLineRunner {

    @Resource
    private RocketMQClientTemplate rocketMQClientTemplate;

    @Value("${demo.acl.rocketmq.normal-topic}")
    private String normalTopic;


    public static void main(String[] args) {
        SpringApplication.run(ClientProducerACLApplication.class, args);
    }

    @Override
    public void run(String... args) throws ClientException {
        testSendNormalMessage();
    }

    void testSendNormalMessage() {
        SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
                .setId(1).setUserName("name").setUserAge((byte) 3));
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);

        sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
                withPayload("test message".getBytes()).build());
        System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
    }
}

Consumer端

修改application.properties

demo.acl.rocketmq.endpoints=localhost:8081
demo.acl.rocketmq.topic=normalTopic
demo.acl.rocketmq.consumer-group=normalGroup
demo.acl.rocketmq.tag=*
demo.acl.rocketmq.access-key=yourAccessKey
demo.acl.rocketmq.secret-key=yourSecretKey

注意: 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据

编写代码

@Service
@RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", secretKey = "${demo.acl.rocketmq.secret-key:}", endpoints = "${demo.acl.rocketmq.endpoints:}", topic = "${demo.acl.rocketmq.topic:}",
        consumerGroup = "${demo.acl.rocketmq.consumer-group:}", tag = "${demo.acl.rocketmq.tag:}")
public class ACLConsumer implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        System.out.println("handle my acl message:" + messageView);
        return ConsumeResult.SUCCESS;
    }
}