examples:
create a new rocketMqMange instance(nameServerAddr ...)
create a new consumer instance(topic/tag/listener ...)(now only support cluster/concurrent)
consumer register to rocketMqMange
rocketMqMange start
register ClientRequestProcessor
Start All Task
update Topic Route Info by consumer subscription data (topic route info data get from name server)
put them into local memory(BrokerAddrTable/TopicPublishInfoTable/TopicSubscribeInfoTable/TopicRouteTable)
prepare heartbeat data(all consumer and producer data in this client)
send it to all brokers.(broker data is from BrokerAddrTable)
(only broker know the distribution of the consumers we can rebalance)
for each MqClientManager.ClientFactory‘s consumers,invoke consumer.rebalance’s DoRebalance method
(after rebalance we can know the (topic/consumer group) should consume from which broker which queue)
put them into local memory(processQueueTable)
enqueue pull message request (chan *model.PullRequest)
dequeue pull message request and pull message from broker,when get messages to consume, put them into consume request,consume request handler will call the listener consume the message
enqueue a new pull message request and commit our consume offset to broker
when message cost too many time,we will drop this message(send message back) (for example 30 mins)