blob: 1cb5efd96559995576cfc81e5b0958dfa092027b [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consumer
import (
"context"
"errors"
"fmt"
"github.com/apache/rocketmq-client-go/kernel"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"math"
"os"
"strconv"
"time"
)
// In most scenarios, this is the mostly recommended usage to consume messages.
//
// Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
// arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
//
// See quick start/Consumer in the example module for a typical usage.
//
// <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
type ConsumeResult int
const (
Mb = 1024 * 1024
ConsumeSuccess ConsumeResult = iota
ConsumeRetryLater
)
type PushConsumer interface {
Start() error
Shutdown()
Subscribe(topic string, selector MessageSelector,
f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
}
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consume func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
submitToConsume func(*processQueue, *kernel.MessageQueue)
subscribedTopic map[string]string
}
func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error) {
if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
return nil, err
}
opt.InstanceName = "DEFAULT"
opt.ClientIP = utils.LocalIP()
if opt.NameServerAddr == "" {
rlog.Fatal("opt.NameServerAddr can't be empty")
}
err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
if err != nil {
rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
}
dc := &defaultConsumer{
consumerGroup: consumerGroup,
cType: _PushConsume,
state: kernel.StateCreateJust,
prCh: make(chan PullRequest, 4),
model: opt.ConsumerModel,
consumeOrderly: opt.ConsumeOrderly,
fromWhere: opt.FromWhere,
option: opt,
}
switch opt.Strategy {
case StrategyAveragely:
dc.allocate = allocateByAveragely
case StrategyAveragelyCircle:
dc.allocate = allocateByAveragelyCircle
case StrategyConfig:
dc.allocate = allocateByConfig
case StrategyConsistentHash:
dc.allocate = allocateByConsistentHash
case StrategyMachineNearby:
dc.allocate = allocateByMachineNearby
case StrategyMachineRoom:
dc.allocate = allocateByMachineRoom
default:
dc.allocate = allocateByAveragely
}
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
p.submitToConsume = p.consumeMessageOrderly
} else {
p.submitToConsume = p.consumeMessageCurrently
}
return p, nil
}
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
rlog.Infof("the consumerGroup=%s start beginning. messageModel=%v, unitMode=%v",
pc.consumerGroup, pc.model, pc.unitMode)
pc.state = kernel.StateStartFailed
pc.validate()
if pc.model == Clustering {
// set retry topic
retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
MessageSelector{TAG, _SubAll}))
}
pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
if pc.model == Clustering {
pc.option.ChangeInstanceNameToPID()
pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
} else {
pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, pc.client.ClientID())
}
go func() {
// todo start clean msg expired
// TODO quit
for {
pr := <-pc.prCh
go func() {
pc.pullMessage(&pr)
}()
}
}()
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
pc.state = kernel.StateCreateJust
rlog.Errorf("the consumer group: [%s] has been created, specify another name.", pc.consumerGroup)
err = errors.New("consumer group has been created")
return
}
pc.client.UpdateTopicRouteInfo()
pc.client.Start()
pc.state = kernel.StateRunning
})
pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
if !exist {
pc.client.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
pc.client.RebalanceImmediately()
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
return err
}
func (pc *pushConsumer) Shutdown() {}
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error {
if pc.state != kernel.StateCreateJust {
return errors.New("subscribe topic only started before")
}
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
pc.consume = f
return nil
}
func (pc *pushConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
}
func (pc *pushConsumer) PersistConsumerOffset() {
pc.defaultConsumer.persistConsumerOffset()
}
func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
}
func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
}
func (pc *pushConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
return pc.defaultConsumer.SubscriptionDataList()
}
func (pc *pushConsumer) IsUnitMode() bool {
return pc.unitMode
}
func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*kernel.MessageQueue) {
// TODO
}
func (pc *pushConsumer) validate() {
kernel.ValidateGroup(pc.consumerGroup)
if pc.consumerGroup == kernel.DefaultConsumerGroup {
// TODO FQA
rlog.Fatalf("consumerGroup can't equal [%s], please specify another one.", kernel.DefaultConsumerGroup)
}
if len(pc.subscribedTopic) == 0 {
rlog.Fatal("number of subscribed topics is 0.")
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
}
}
if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
rlog.Fatal("option.PullThresholdForQueue out of range [1, 65535]")
}
}
if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
rlog.Fatal("option.PullThresholdForTopic out of range [1, 6553500]")
}
}
if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
rlog.Fatal("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
}
if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
rlog.Fatal("option.PullThresholdSizeForTopic out of range [1, 102400]")
}
}
if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
rlog.Fatal("option.PullInterval out of range [0, 65535]")
}
if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 512
} else {
rlog.Fatal("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
}
}
if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
} else {
rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
}
}
}
func (pc *pushConsumer) pullMessage(request *PullRequest) {
rlog.Infof("start a nwe Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
var sleepTime time.Duration
pq := request.pq
go func() {
for {
pc.submitToConsume(request.pq, request.mq)
}
}()
for {
NEXT:
if pq.dropped {
rlog.Infof("the request: [%s] was dropped, so stop task", request.String())
return
}
if sleepTime > 0 {
rlog.Infof("pull MessageQueue: %d sleep %d ms", request.mq.QueueId, sleepTime/time.Millisecond)
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
pq.lastPullTime = time.Now()
err := pc.makeSureStateOK()
if err != nil {
rlog.Warnf("consumer state error: %s", err.Error())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {
rlog.Infof("consumer [%s] of [%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup, request.String())
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warnf("the cached message count exceeds the threshold %d, so do flow control, "+
"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
pc.option.PullThresholdForQueue, 0, pq.Min(), pq.Max(),
pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warnf("the cached message size exceeds the threshold %d MiB, so do flow control, "+
"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
pc.option.PullThresholdSizeForQueue, pq.Min(), pq.Max(),
pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if !pc.consumeOrderly {
if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
rlog.Warnf("the queue's messages, span too long, limit=%d, so do flow control, minOffset=%d, "+
"maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
pq.Min(), pq.Max(), pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
}
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
} else {
if pq.locked {
if !request.lockedFirst {
offset := pc.computePullFromWhere(request.mq)
brokerBusy := offset < request.nextOffset
rlog.Infof("the first time to pull message, so fix offset from broker. "+
"pullRequest: [%s] NewOffset: %d brokerBusy: %v",
request.String(), offset, brokerBusy)
if brokerBusy {
rlog.Infof("[NOTIFY_ME]the first time to pull message, but pull request offset"+
" larger than broker consume offset. pullRequest: [%s] NewOffset: %d",
request.String(), offset)
}
request.lockedFirst = true
request.nextOffset = offset
}
} else {
rlog.Infof("pull message later because not locked in broker, [%s]", request.String())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
}
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {
rlog.Warnf("find the consumer's subscription failed, %s", request.String())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
beginTime := time.Now()
var (
commitOffsetEnable bool
commitOffsetValue int64
subExpression string
)
if pc.model == Clustering {
commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {
commitOffsetEnable = true
}
}
sd := v.(*kernel.SubscriptionData)
classFilter := sd.ClassFilterMode
if pc.option.PostSubscriptionWhenPull && classFilter {
subExpression = sd.SubString
}
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
pullRequest := &kernel.PullMessageRequest{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: _SubAll,
ExpressionType: string(TAG), // TODO
}
//
//if data.ExpType == string(TAG) {
// pullRequest.SubVersion = 0
//} else {
// pullRequest.SubVersion = data.SubVersion
//}
brokerResult := tryFindBroker(request.mq)
if brokerResult == nil {
rlog.Warnf("no broker found for %s", request.mq.String())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warnf("pull message from %s error: %s", brokerResult.BrokerAddr, err.Error())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == kernel.PullBrokerTimeout {
rlog.Warnf("pull broker: %s timeout", brokerResult.BrokerAddr)
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
switch result.Status {
case kernel.PullFound:
rlog.Debugf("Topic: %s, QueueId: %d found messages: %d", request.mq.Topic, request.mq.QueueId,
len(result.GetMessageExts()))
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime)
increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%d, "+
"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
}
case kernel.PullNoNewMsg:
rlog.Debugf("Topic: %s, QueueId: %d no more msg, next offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
case kernel.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case kernel.PullOffsetIllegal:
rlog.Warnf("the pull request offset illegal, {} {}", request.String(), result.String())
request.nextOffset = result.NextBeginOffset
pq.dropped = true
go func() {
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*kernel.MessageQueue{request.mq})
pc.storage.remove(request.mq)
rlog.Warnf("fix the pull request offset: %s", request.String())
}()
default:
rlog.Warnf("unknown pull status: %v", result.Status)
sleepTime = _PullDelayTimeWhenError
}
}
}
func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
// TODO
}
func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.MessageExt) bool {
return true
}
func (pc *pushConsumer) suspend() {
pc.pause = true
rlog.Infof("suspend consumer: %s", pc.consumerGroup)
}
func (pc *pushConsumer) resume() {
pc.pause = false
pc.doBalance()
rlog.Infof("resume consumer: %s", pc.consumerGroup)
}
func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
// rlog.Warnf("received reset offset command from: %s, but missing params.", from)
// return
//}
//t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
//if err != nil {
// rlog.Warnf("received reset offset command from: %s, but parse time error: %s", err.Error())
// return
//}
//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
// from, topic, group, t)
//
//offsetTable := make(map[kernel.MessageQueue]int64, 0)
//err = json.Unmarshal(cmd.Body, &offsetTable)
//if err != nil {
// rlog.Warnf("received reset offset command from: %s, but parse offset table: %s", err.Error())
// return
//}
//v, exist := c.consumerMap.Load(group)
//if !exist {
// rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
// return
//}
set := make(map[int]*kernel.MessageQueue, 0)
for k := range table {
set[k.HashCode()] = &k
}
pc.processQueueTable.Range(func(key, value interface{}) bool {
mqHash := value.(int)
pq := value.(*processQueue)
if set[mqHash] != nil {
pq.dropped = true
pq.clear()
}
return true
})
time.Sleep(10 * time.Second)
v, exist := pc.topicSubscribeInfoTable.Load(topic)
if !exist {
return
}
queuesOfTopic := v.(map[int]*kernel.MessageQueue)
for k := range queuesOfTopic {
q := set[k]
if q != nil {
pc.storage.update(q, table[*q], false)
v, exist := pc.processQueueTable.Load(k)
if !exist {
continue
}
pq := v.(*processQueue)
pc.removeUnnecessaryMessageQueue(q, pq)
delete(queuesOfTopic, k)
}
}
}
func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
if !pc.consumeOrderly || Clustering != pc.model {
return true
}
// TODO orderly
return true
}
type ConsumeMessageContext struct {
consumerGroup string
msgs []*kernel.MessageExt
mq *kernel.MessageQueue
success bool
status string
// mqTractContext
properties map[string]string
}
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
for count := 0; count < len(msgs); count++ {
var subMsgs []*kernel.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
} else {
next := count + pc.option.ConsumeMessageBatchMaxSize
subMsgs = msgs[count:next]
count = next
}
go func() {
RETRY:
if pq.dropped {
rlog.Infof("the message queue not be able to consume, because it was dropped. group=%s, mq=%s",
pc.consumerGroup, mq.String())
return
}
ctx := &ConsumeMessageContext{
properties: make(map[string]string),
}
// TODO hook
beginTime := time.Now()
groupTopic := kernel.RetryGroupTopicPrefix + pc.consumerGroup
for idx := range subMsgs {
msg := subMsgs[idx]
if msg.Properties != nil {
retryTopic := msg.Properties[kernel.PropertyRetryTopic]
if retryTopic == "" && groupTopic == msg.Topic {
msg.Topic = retryTopic
}
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
beginTime.UnixNano()/int64(time.Millisecond), 10)
}
}
result, err := pc.consume(ctx, subMsgs)
consumeRT := time.Now().Sub(beginTime)
if err != nil {
ctx.properties["ConsumeContextType"] = "EXCEPTION"
} else if consumeRT >= pc.option.ConsumeTimeout {
ctx.properties["ConsumeContextType"] = "TIMEOUT"
} else if result == ConsumeSuccess {
ctx.properties["ConsumeContextType"] = "SUCCESS"
} else {
ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
}
// TODO hook
increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
if !pq.dropped {
msgBackFailed := make([]*kernel.MessageExt, 0)
if result == ConsumeSuccess {
increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(msgs); i++ {
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", subMsgs[i])
}
} else {
for i := 0; i < len(msgs); i++ {
msg := msgs[i]
if !pc.sendMessageBack(ctx, msg) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(subMsgs...)
if offset >= 0 && !pq.dropped {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
subMsgs = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warnf("processQueue is dropped without process consume result. messageQueue=%s, msgs=%+v",
mq, msgs)
}
}()
}
}
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *kernel.MessageQueue) {
}