| # Schedule example |
| |
| ### 1 Start consumer to wait for incoming subscribed messages |
| |
| ```java |
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
| import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import java.util.List; |
| |
| public class ScheduledMessageConsumer { |
| |
| public static void main(String[] args) throws Exception { |
| // Instantiate message consumer |
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); |
| // Subscribe topics |
| consumer.subscribe("TestTopic", "*"); |
| // Register message listener |
| consumer.registerMessageListener(new MessageListenerConcurrently() { |
| @Override |
| public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { |
| for (MessageExt message : messages) { |
| // Print approximate delay time period |
| System.out.println("Receive message[msgId=" + message.getMsgId() + "] " |
| + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); |
| } |
| return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
| } |
| }); |
| // Launch consumer |
| consumer.start(); |
| } |
| } |
| ``` |
| |
| ### 2 Send scheduled messages |
| |
| ```java |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.common.message.Message; |
| |
| public class ScheduledMessageProducer { |
| |
| public static void main(String[] args) throws Exception { |
| // Instantiate a producer to send scheduled messages |
| DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); |
| // Launch producer |
| producer.start(); |
| int totalMessagesToSend = 100; |
| for (int i = 0; i < totalMessagesToSend; i++) { |
| Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); |
| // This message will be delivered to consumer 10 seconds later. |
| message.setDelayTimeLevel(3); |
| // Send the message |
| producer.send(message); |
| } |
| |
| // Shutdown producer after use. |
| producer.shutdown(); |
| } |
| |
| } |
| ``` |
| |
| ### 3 Verification |
| |
| You should see messages are consumed about 10 seconds later than their storing time. |
| |
| ### 4 Use scenarios for scheduled messages |
| |
| For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released. |
| |
| ### 5 Restrictions on the use of scheduled messages |
| |
| ```java |
| // org/apache/rocketmq/store/config/MessageStoreConfig.java |
| |
| private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; |
| ``` |
| |
| Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries. |
| |
| See `SendMessageProcessor.java` |