DefaultPullConsumer


类简介

  1. DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer

  2. DefaultMQPullConsumer主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。

  3. 优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。

  4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。


import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; public class MQPullConsumer { private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); // 从指定topic中拉取所有消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 获取消息的offset,指定从store中获取 long offset = consumer.fetchConsumeOffset(mq,true); System.out.println("consumer from the queue:"+mq+":"+offset); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); } // 保存上次消费的消息下标 private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 获取上次消费的消息的下标 private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; } }

字段摘要

类型字段名称描述
DefaultMQPullConsumerImpldefaultMQPullConsumerImplDefaultMQPullConsumer的内部核心处理默认实现
StringconsumerGroup消费的唯一分组
longbrokerSuspendMaxTimeMillisconsumer取连接broker的最大延迟时间,不建议修改
longconsumerTimeoutMillisWhenSuspendpull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改
longconsumerPullTimeoutMillissocket连接的最大超时时间,不建议修改
StringmessageModel默认cluster模式
intmessageQueueListener消息queue监听器,用来获取topic的queue变化
intoffsetStoreRemoteBrokerOffsetStore 远程与本地offset存储器
intregisterTopics注册到该consumer的topic集合
intallocateMessageQueueStrategyconsumer的默认获取queue的负载分配策略算法

构造方法摘要

方法名称方法描述
DefaultMQPullConsumer()由默认参数值创建一个Pull消费者
DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook)使用指定的分组名,hook创建一个消费者
DefaultMQPullConsumer(final String consumerGroup)使用指定的分组名消费者
DefaultMQPullConsumer(RPCHook rpcHook)使用指定的hook创建一个生产者

使用方法摘要

返回值方法名称方法描述
MQAdmin接口method-------------------
voidcreateTopic(String key, String newTopic, int queueNum)在broker上创建指定的topic
voidcreateTopic(String key, String newTopic, int queueNum, int topicSysFlag)在broker上创建指定的topic
longearliestMsgStoreTime(MessageQueue mq)查询最早的消息存储时间
longmaxOffset(MessageQueue mq)查询给定消息队列的最大offset
longminOffset(MessageQueue mq)查询给定消息队列的最小offset
QueryResultqueryMessage(String topic, String key, int maxNum, long begin, long end)按关键字查询消息
longsearchOffset(MessageQueue mq, long timestamp)查找指定时间的消息队列的物理offset
MessageExtviewMessage(String offsetMsgId)根据给定的msgId查询消息
MessageExtpublic MessageExt viewMessage(String topic, String msgId)根据给定的msgId查询消息,并指定topic
MQConsumer接口method-------------------
SetfetchSubscribeMessageQueues(String topic)根据topic获取订阅的Queue
voidsendMessageBack(final MessageExt msg, final int delayLevel)如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
voidsendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
MQPullConsumer接口method-------------------
longfetchConsumeOffset(MessageQueue mq, boolean fromStore)查询给定消息队列的最大offset
PullResultpull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums)异步拉取制定匹配的消息
PullResultpull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final long timeout)异步拉取制定匹配的消息
PullResultpull(final MessageQueue mq, final MessageSelector selector, final long offset,final int maxNums)异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType
PullResultpullBlockIfNotFound(final MessageQueue mq, final String subExpression,final long offset, final int maxNums)异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis
voidpullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final PullCallback pullCallback)异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费
voidupdateConsumeOffset(final MessageQueue mq, final long offset)更新指定mq的offset
longfetchMessageQueuesInBalance(String topic)根据topic获取订阅的Queue(是balance分配后的)
voidvoid sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费
voidshutdown()关闭当前消费者实例并释放相关资源
voidstart()启动消费者