| # Schedule example |
| |
| ### 1 启动消费者等待传入的订阅消息 |
| |
| ```java |
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
| import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import java.util.List; |
| |
| public class ScheduledMessageConsumer { |
| |
| public static void main(String[] args) throws Exception { |
| // Instantiate message consumer |
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); |
| // Subscribe topics |
| consumer.subscribe("TestTopic", "*"); |
| // Register message listener |
| consumer.registerMessageListener(new MessageListenerConcurrently() { |
| @Override |
| public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { |
| for (MessageExt message : messages) { |
| // Print approximate delay time period |
| System.out.println("Receive message[msgId=" + message.getMsgId() + "] " |
| + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); |
| } |
| return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
| } |
| }); |
| // Launch consumer |
| consumer.start(); |
| } |
| } |
| ``` |
| |
| ### 2 发送延迟消息 |
| |
| ```java |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.common.message.Message; |
| |
| public class ScheduledMessageProducer { |
| |
| public static void main(String[] args) throws Exception { |
| // Instantiate a producer to send scheduled messages |
| DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); |
| // Launch producer |
| producer.start(); |
| int totalMessagesToSend = 100; |
| for (int i = 0; i < totalMessagesToSend; i++) { |
| Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); |
| // This message will be delivered to consumer 10 seconds later. |
| message.setDelayTimeLevel(3); |
| // Send the message |
| producer.send(message); |
| } |
| |
| // Shutdown producer after use. |
| producer.shutdown(); |
| } |
| |
| } |
| ``` |
| |
| ### 3 确认 |
| |
| 您应该会看到消息在其存储时间后大约 10 秒被消耗。 |
| |
| ### 4 延迟消息的使用场景 |
| |
| 例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。 |
| |
| ### 5 使用延迟消息的限制 |
| |
| ```java |
| // org/apache/rocketmq/store/config/MessageStoreConfig.java |
| |
| private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; |
| ``` |
| |
| 当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。 |
| |
| See `SendMessageProcessor.java` |