blob: b26c82976cf8a75099a980ec74ac844f09f5b1df [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consumer
import (
"context"
"fmt"
"math"
"runtime/pprof"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
// ErrNoNewMsg returns a "no new message found". Occurs only when no new message found from broker
var ErrNoNewMsg = errors.New("no new message found")
func IsNoNewMsgError(err error) bool {
return err == ErrNoNewMsg
}
type ConsumeRequest struct {
messageQueue *primitive.MessageQueue
processQueue *processQueue
msgList []*primitive.MessageExt
}
func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
return cr.msgList
}
func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
return cr.messageQueue
}
func (cr *ConsumeRequest) GetPQ() *processQueue {
return cr.processQueue
}
type defaultPullConsumer struct {
*defaultConsumer
topic string
selector MessageSelector
GroupName string
Model MessageModel
UnitMode bool
nextQueueSequence int64
allocateQueues []*primitive.MessageQueue
done chan struct{}
closeOnce sync.Once
consumeRequestCache chan *ConsumeRequest
submitToConsume func(*processQueue, *primitive.MessageQueue)
interceptor primitive.Interceptor
}
func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
defaultOpts := defaultPullConsumerOptions()
for _, apply := range options {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
defaultOpts.Namesrv = srvs
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, defaultOpts.GroupName),
cType: _PullConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,
allocate: defaultOpts.Strategy,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()
c := &defaultPullConsumer{
defaultConsumer: dc,
done: make(chan struct{}, 1),
consumeRequestCache: make(chan *ConsumeRequest, 4),
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageConcurrently
c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
return c, nil
}
func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.topic = topic
pc.selector = selector
return nil
}
func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
topic = utils.WrapNamespace(pc.option.Namespace, topic)
pc.subscriptionDataTable.Delete(topic)
return nil
}
func (pc *defaultPullConsumer) Start() error {
var err error
pc.once.Do(func() {
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}
err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = errors2.ErrCreated
return
}
err = pc.start()
if err != nil {
return
}
atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
go func() {
for {
select {
case pr := <-pc.prCh:
go func() {
pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("defaultPullConsumer close PullRequest listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}()
})
if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
_, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
if !exist {
err = pc.Shutdown()
if err != nil {
rlog.Error("defaultPullConsumer.Shutdown . route info not found, it may not exist", map[string]interface{}{
rlog.LogKeyTopic: pc.topic,
rlog.LogKeyUnderlayError: err,
})
}
return fmt.Errorf("the topic=%s route info not found, it may not exist", pc.topic)
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
go pc.client.RebalanceImmediately()
return err
}
func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout time.Duration) (*ConsumeRequest, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-ctx.Done():
return nil, ErrNoNewMsg
case cr := <-pc.consumeRequestCache:
if cr.processQueue.IsDroppd() {
rlog.Info("defaultPullConsumer poll the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: cr.messageQueue.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return nil, ErrNoNewMsg
}
if len(cr.GetMsgList()) == 0 {
return nil, ErrNoNewMsg
}
return cr, nil
}
}
func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult) {
if cr == nil {
return
}
pq := cr.processQueue
mq := cr.messageQueue
msgList := cr.msgList
if len(msgList) == 0 || pq == nil || mq == nil {
return
}
RETRY:
if pq.IsDroppd() {
rlog.Info("defaultPullConsumer the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
pc.resetRetryAndNamespace(msgList)
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgList,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
msgCtx.Success = true
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
msgCtx.Success = false
}
if pc.interceptor != nil {
pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error {
return nil
})
}
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(msgList))
msgBackSucceed = msgList
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(msgList))
if pc.model == BroadCasting {
for i := 0; i < len(msgList); i++ {
rlog.Warning("defaultPullConsumer BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": msgList[i],
})
}
} else {
for i := 0; i < len(msgList); i++ {
msg := msgList[i]
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msgBackSucceed = append(msgBackSucceed, msg)
} else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(msgBackSucceed...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
msgList = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warning("defaultPullConsumer processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"message": msgList,
})
}
}
// resetRetryAndNamespace modify retry message.
func (pc *defaultPullConsumer) resetRetryAndNamespace(msgList []*primitive.MessageExt) {
groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
beginTime := time.Now()
for idx := range msgList {
msg := msgList[idx]
retryTopic := msg.GetProperty(primitive.PropertyRetryTopic)
if retryTopic == "" && groupTopic == msg.Topic {
msg.Topic = retryTopic
}
msgList[idx].WithProperty(primitive.PropertyConsumeStartTime, strconv.FormatInt(
beginTime.UnixNano()/int64(time.Millisecond), 10))
}
}
func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitive.PullResult, error) {
mq := pc.getNextQueueOf(pc.topic)
if mq == nil {
return nil, fmt.Errorf("prepare to pull topic: %s, but no queue is founded", pc.topic)
}
data := buildSubscriptionData(mq.Topic, pc.selector)
nextOffset, err := pc.nextOffsetOf(mq)
if err != nil {
return nil, err
}
result, err := pc.pull(context.Background(), mq, data, nextOffset, numbers)
if err != nil {
return nil, err
}
pc.processPullResult(mq, result, data)
if pc.interceptor != nil {
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: result.GetMessageExts(),
Success: true,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx context.Context, req, reply interface{}) error {
return nil
})
}
return result, nil
}
func (pc *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
var queues []*primitive.MessageQueue
var err error
if len(pc.allocateQueues) == 0 {
topic = utils.WrapNamespace(pc.option.Namespace, topic)
queues, err = pc.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
if err != nil {
rlog.Error("get next mq error", map[string]interface{}{
rlog.LogKeyTopic: topic,
rlog.LogKeyUnderlayError: err.Error(),
})
return nil
}
if len(queues) == 0 {
rlog.Warning("defaultPullConsumer.getNextQueueOf len is 0", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil
}
} else {
queues = pc.allocateQueues
}
index := int(atomic.LoadInt64(&pc.nextQueueSequence)) % len(queues)
atomic.AddInt64(&pc.nextQueueSequence, 1)
nextQueue := queues[index]
rlog.Info("defaultPullConsumer.getNextQueueOf", map[string]interface{}{
rlog.LogKeyTopic: topic,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyMessageQueue: queues,
rlog.LogKeyAllocateMessageQueue: nextQueue.String(),
})
return nextQueue
}
func (pc *defaultPullConsumer) checkPull(mq *primitive.MessageQueue, offset int64, numbers int) error {
err := pc.makeSureStateOK()
if err != nil {
return err
}
if mq == nil {
return errors2.ErrMQEmpty
}
if offset < 0 {
return errors2.ErrOffset
}
if numbers <= 0 {
return errors2.ErrNumbers
}
return nil
}
// TODO: add timeout limit
// TODO: add hook
func (pc *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data *internal.SubscriptionData,
offset int64, numbers int) (*primitive.PullResult, error) {
mq.Topic = utils.WrapNamespace(pc.option.Namespace, mq.Topic)
pc.consumerGroup = utils.WrapNamespace(pc.option.Namespace, pc.consumerGroup)
if err := pc.checkPull(mq, offset, numbers); err != nil {
return nil, err
}
pc.subscriptionAutomatically(mq.Topic)
sysFlag := buildSysFlag(false, true, true, false)
pullResp, err := pc.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
if err != nil {
return nil, err
}
pc.processPullResult(mq, pullResp, data)
return pullResp, err
}
func (pc *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) (int64, error) {
return pc.computePullFromWhereWithException(queue)
}
// PullFrom pull messages of queue from the offset to offset + numbers
func (pc *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
if err := pc.checkPull(queue, offset, numbers); err != nil {
return nil, err
}
data := buildSubscriptionData(queue.Topic, pc.selector)
return pc.pull(ctx, queue, data, offset, numbers)
}
// UpdateOffset updateOffset update offset of queue in mem
func (pc *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {
return pc.updateOffset(queue, offset)
}
// PersistOffset persist all offset in mem.
func (pc *defaultPullConsumer) PersistOffset(ctx context.Context, topic string) error {
return pc.persistConsumerOffset()
}
// CurrentOffset return the current offset of queue in mem.
func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {
v := pc.queryOffset(queue)
return v, nil
}
// Shutdown close defaultConsumer, refuse new request.
func (pc *defaultPullConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
if pc.option.TraceDispatcher != nil {
pc.option.TraceDispatcher.Close()
}
close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})
return err
}
func (pc *defaultPullConsumer) PersistConsumerOffset() error {
return pc.defaultConsumer.persistConsumerOffset()
}
func (pc *defaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
}
func (pc *defaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
}
func (pc *defaultPullConsumer) SubscriptionDataList() []*internal.SubscriptionData {
return pc.defaultConsumer.SubscriptionDataList()
}
func (pc *defaultPullConsumer) IsUnitMode() bool {
return pc.unitMode
}
func (pc *defaultPullConsumer) GetcType() string {
return string(pc.cType)
}
func (pc *defaultPullConsumer) GetModel() string {
return pc.model.String()
}
func (pc *defaultPullConsumer) GetWhere() string {
switch pc.fromWhere {
case ConsumeFromLastOffset:
return "CONSUME_FROM_LAST_OFFSET"
case ConsumeFromFirstOffset:
return "CONSUME_FROM_FIRST_OFFSET"
case ConsumeFromTimestamp:
return "CONSUME_FROM_TIMESTAMP"
default:
return "UNKNOWN"
}
}
func (pc *defaultPullConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
}
func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
pc.defaultConsumer.doBalanceIfNotPaused()
}
func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()
pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
info.SubscriptionData[value.(*internal.SubscriptionData)] = true
status := internal.ConsumeStatus{
PullRT: pc.stat.getPullRT(pc.consumerGroup, topic).avgpt,
PullTPS: pc.stat.getPullTPS(pc.consumerGroup, topic).tps,
ConsumeRT: pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt,
ConsumeOKTPS: pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps,
ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps,
ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
}
info.StatusTable[topic] = status
return true
})
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pInfo := pq.currentInfo()
pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore)
info.MQTable[mq] = pInfo
return true
})
if stack {
var buffer strings.Builder
err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
if err != nil {
rlog.Error("error when get stack ", map[string]interface{}{
"error": err,
})
} else {
info.JStack = buffer.String()
}
}
nsAddr := ""
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
info.Properties[internal.PropConsumeType] = string(pc.cType)
info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
info.Properties[internal.PropThreadPoolCoreSize] = "-1"
info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
return info
}
func (pc *defaultPullConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
return nil
}
func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) {
}
func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
var allocateQueues []*primitive.MessageQueue
pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
allocateQueues = append(allocateQueues, &mq)
return true
})
pc.allocateQueues = allocateQueues
pc.defaultConsumer.client.SendHeartbeatToAllBrokerWithLock()
}
func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {
var brokerAddr string
if len(brokerName) != 0 {
brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
} else {
brokerAddr = msg.StoreHost
}
resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil || resp.Code != internal.ResSuccess {
// send back as a normal message
return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes())
}
return true
}
func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand {
req := &internal.ConsumerSendMsgBackRequestHeader{
Group: pc.consumerGroup,
OriginTopic: msg.Topic,
Offset: msg.CommitLogOffset,
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
}
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
}
func (pc *defaultPullConsumer) getMaxReconsumeTimes() int32 {
if pc.option.MaxReconsumeTimes == -1 {
return 16
} else {
return pc.option.MaxReconsumeTimes
}
}
func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
rlog.Debug("defaultPullConsumer start a new Pull Message task for PullRequest", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
var sleepTime time.Duration
pq := request.pq
go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
rlog.Info("defaultPullConsumer close pullMessage.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
pc.submitToConsume(request.pq, request.mq)
if request.pq.IsDroppd() {
rlog.Info("defaultPullConsumer quit pullMessage for dropped queue.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}
})
for {
NEXT:
select {
case <-pc.done:
rlog.Info("defaultPullConsumer close message handle.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
}
if pq.IsDroppd() {
rlog.Debug("defaultPullConsumer the request was dropped, so stop task", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
return
}
if sleepTime > 0 {
rlog.Debug(fmt.Sprintf("defaultPullConsumer pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
rlog.Warning("defaultPullConsumer state error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {
rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of [%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {
rlog.Info("defaultPullConsumer find the consumer's subscription failed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
beginTime := time.Now()
sd := v.(*internal.SubscriptionData)
sysFlag := buildSysFlag(false, true, true, false)
pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
SubExpression: sd.SubString,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
}
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
if brokerResult == nil {
rlog.Warning("defaultPullConsumer no broker found for mq", map[string]interface{}{
rlog.LogKeyPullRequest: request.mq.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if brokerResult.Slave {
pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
}
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == primitive.PullBrokerTimeout {
rlog.Warning("defaultPullConsumer pull broker timeout", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
pc.processPullResult(request.mq, result, sd)
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("defaultPullConsumer the pull request offset illegal", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(*request.mq)
rlog.Warning(fmt.Sprintf("defaultPullConsumer fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("defaultPullConsumer unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
}
}
func (pc *defaultPullConsumer) correctTagsOffset(pr *PullRequest) {
if pr.pq.cachedMsgCount.Load() <= 0 {
pc.storage.update(pr.mq, pr.nextOffset, true)
}
}
func (pc *defaultPullConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgList := pq.getMessages()
if msgList == nil {
return
}
if pq.IsDroppd() {
rlog.Info("defaultPullConsumer consumeMessageConcurrently the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
cr := &ConsumeRequest{
messageQueue: mq,
processQueue: pq,
msgList: msgList,
}
select {
case <-pq.closeChan:
return
case pc.consumeRequestCache <- cr:
}
}
func (pc *defaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus {
consumerStatus := internal.NewConsumerStatus()
mqOffsetMap := pc.storage.getMQOffsetMap(topic)
if mqOffsetMap != nil {
consumerStatus.MQOffsetMap = mqOffsetMap
}
return consumerStatus
}
func (pc *defaultPullConsumer) validate() error {
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
return err
}
if pc.consumerGroup == internal.DefaultConsumerGroup {
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}
return nil
}