blob: 06188428f204f385889ce165a14913ddf79a2cf9 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernel
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
"strconv"
"time"
)
//PullMessageController put pull message logic here
type PullMessageController struct {
mqClient RocketMqClient
clientFactory *clientFactory
}
func newPullMessageController(mqClient RocketMqClient, clientFactory *clientFactory) *PullMessageController {
return &PullMessageController{
mqClient: mqClient,
clientFactory: clientFactory,
}
}
func (p *PullMessageController) start() {
go func() {
for {
pullRequest := p.mqClient.dequeuePullMessageRequest()
p.pullMessage(pullRequest)
}
}()
}
func (p *PullMessageController) needDelayPullMessage(mqPushConsumer *DefaultMQPushConsumer, pullRequest *model.PullRequest) (needDelayTime int64) {
if pullRequest.ProcessQueue.GetMsgCount() > mqPushConsumer.ConsumerConfig.PullThresholdForQueue {
return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
}
if pullRequest.ProcessQueue.GetMaxSpan() > mqPushConsumer.ConsumerConfig.ConsumeConcurrentlyMaxSpan {
return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
}
return
}
func (p *PullMessageController) pullMessageLater(pullRequest *model.PullRequest, millisecond int64) {
go func() {
timeoutTimer := time.NewTimer(time.Duration(millisecond) * time.Millisecond)
<-timeoutTimer.C
p.pullMessage(pullRequest)
}()
return
}
func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
defaultMQPullConsumer := p.clientFactory.consumerTable[pullRequest.ConsumerGroup]
if pullRequest.ProcessQueue.IsDropped() {
return
}
delayPullTime := p.needDelayPullMessage(defaultMQPullConsumer, pullRequest)
if delayPullTime > 0 {
p.pullMessageLater(pullRequest, delayPullTime)
return
}
commitOffsetValue := defaultMQPullConsumer.offsetStore.readOffset(pullRequest.MessageQueue, READ_FROM_MEMORY)
subscriptionData, ok := defaultMQPullConsumer.rebalance.subscriptionInner[pullRequest.MessageQueue.Topic]
if !ok {
p.pullMessageLater(pullRequest, defaultMQPullConsumer.ConsumerConfig.PullTimeDelayMillsWhenException)
return
}
var sysFlag int32
if commitOffsetValue > 0 {
sysFlag |= constant.FLAG_COMMIT_OFFSET
}
sysFlag |= constant.FLAG_SUSPEND
sysFlag |= constant.FLAG_SUBSCRIPTION
requestHeader := new(header.PullMessageRequestHeader)
requestHeader.ConsumerGroup = pullRequest.ConsumerGroup
requestHeader.Topic = pullRequest.MessageQueue.Topic
requestHeader.QueueId = pullRequest.MessageQueue.QueueId
requestHeader.QueueOffset = pullRequest.NextOffset
requestHeader.CommitOffset = commitOffsetValue
requestHeader.SuspendTimeoutMillis = defaultMQPullConsumer.ConsumerConfig.BrokerSuspendMaxTimeMillis
requestHeader.MaxMsgNums = int32(defaultMQPullConsumer.ConsumerConfig.PullBatchSize)
requestHeader.SubVersion = subscriptionData.SubVersion
requestHeader.Subscription = subscriptionData.SubString
requestHeader.SysFlag = sysFlag
pullCallback := func(responseFuture *remoting.ResponseFuture) {
var nextBeginOffset int64 = pullRequest.NextOffset
if responseFuture != nil {
responseCommand := responseFuture.ResponseCommand
if responseCommand.Code == remoting.SUCCESS && len(responseCommand.Body) > 0 {
nextBeginOffset = parseNextBeginOffset(responseCommand)
//}
msgs := decodeMessage(responseFuture.ResponseCommand.Body)
msgs = filterMessageAgainByTags(msgs, defaultMQPullConsumer.subscriptionTag[pullRequest.MessageQueue.Topic])
if len(msgs) == 0 {
if pullRequest.ProcessQueue.GetMsgCount() == 0 {
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
}
}
pullRequest.ProcessQueue.PutMessage(msgs)
defaultMQPullConsumer.consumeMessageService.submitConsumeRequest(msgs, pullRequest.ProcessQueue, pullRequest.MessageQueue, true)
} else {
//var err error // change the offset , use nextBeginOffset
//pullResult := responseCommand.ExtFields
//if ok {
// if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
// if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
// nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
// if err != nil {
// glog.Error(err)
// }
// }
// }
nextBeginOffset = parseNextBeginOffset(responseCommand)
//}
if responseCommand.Code == remoting.PULL_NOT_FOUND || responseCommand.Code == remoting.PULL_RETRY_IMMEDIATELY {
//NO_NEW_MSG //NO_MATCHED_MSG
if pullRequest.ProcessQueue.GetMsgCount() == 0 {
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
}
//update offset increase only
//failedPullRequest, _ := json.Marshal(pullRequest)
//glog.Error("the pull request offset illegal", string(failedPullRequest))
} else if responseCommand.Code == remoting.PULL_OFFSET_MOVED {
//OFFSET_ILLEGAL
glog.Error(fmt.Sprintf("PULL_OFFSET_MOVED,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body)))
pullRequest.ProcessQueue.SetDrop(true)
go func() {
executeTaskLater := time.NewTimer(10 * time.Second)
<-executeTaskLater.C
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, false)
defaultMQPullConsumer.rebalance.removeProcessQueue(pullRequest.MessageQueue)
}()
} else {
glog.Errorf("illegal response code. pull message error,code=%d,request=%v OFFSET_ILLEGAL", responseCommand.Code, requestHeader)
glog.Error(pullRequest.MessageQueue)
time.Sleep(1 * time.Second)
}
}
} else {
glog.Error("responseFuture is nil")
}
p.enqueueNextPullRequest(defaultMQPullConsumer, pullRequest, nextBeginOffset)
}
glog.V(2).Infof("requestHeader look offset %s %s %s %s", requestHeader.QueueOffset, requestHeader.Topic, requestHeader.QueueId, requestHeader.CommitOffset)
p.consumerPullMessageAsync(pullRequest.MessageQueue.BrokerName, requestHeader, pullCallback)
}
//func (p *PullMessageController) updateOffsetIfNeed(msgs []message.MessageExtImpl, pullRequest *model.PullRequest, defaultMQPullConsumer *DefaultMQPushConsumer, nextBeginOffset int64) {
// if len(msgs) == 0 {
// if pullRequest.ProcessQueue.GetMsgCount() == 0 {
// defaultMQPullConsumer.OffsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
// }
// }
//}
func parseNextBeginOffset(responseCommand *remoting.RemotingCommand) (nextBeginOffset int64) {
var err error
pullResult := responseCommand.ExtFields
if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
if err != nil {
panic(err)
}
}
}
return
}
func (p *PullMessageController) enqueueNextPullRequest(defaultMQPullConsumer *DefaultMQPushConsumer, pullRequest *model.PullRequest, nextBeginOffset int64) {
if pullRequest.ProcessQueue.IsDropped() {
return
}
nextPullRequest := &model.PullRequest{
ConsumerGroup: pullRequest.ConsumerGroup,
NextOffset: nextBeginOffset,
MessageQueue: pullRequest.MessageQueue,
ProcessQueue: pullRequest.ProcessQueue,
}
if defaultMQPullConsumer.ConsumerConfig.PullInterval > 0 {
go func() {
nextPullTime := time.NewTimer(time.Duration(defaultMQPullConsumer.ConsumerConfig.PullInterval) * time.Millisecond)
<-nextPullTime.C
p.mqClient.enqueuePullMessageRequest(nextPullRequest)
}()
} else {
p.mqClient.enqueuePullMessageRequest(nextPullRequest)
}
}
func filterMessageAgainByTags(msgExts []message.MessageExtImpl, subscriptionTagList []string) (result []message.MessageExtImpl) {
result = msgExts
if len(subscriptionTagList) == 0 {
return
}
result = []message.MessageExtImpl{}
for _, msg := range msgExts {
for _, tag := range subscriptionTagList {
if tag == msg.Tag() {
result = append(result, msg)
break
}
}
}
return
}
func (p *PullMessageController) consumerPullMessageAsync(brokerName string, requestHeader remoting.CustomerHeader, invokeCallback remoting.InvokeCallback) {
brokerAddr, _, found := p.mqClient.findBrokerAddressInSubscribe(brokerName, 0, false)
if found {
remotingCommand := remoting.NewRemotingCommand(remoting.PULL_MESSAGE, requestHeader)
p.mqClient.getRemotingClient().InvokeAsync(brokerAddr, remotingCommand, 1000, invokeCallback)
}
}
func decodeMessage(data []byte) []message.MessageExtImpl {
buf := bytes.NewBuffer(data)
var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, reconsumeTimes, bodyLength, bornPort, storePort int32
var queueOffset, physicOffset, preparedTransactionOffset, bornTimeStamp, storeTimestamp int64
var topicLen byte
var topic, body, properties, bornHost, storeHost []byte
var propertiesLength int16
var propertiesMap = make(map[string]string)
msgs := []message.MessageExtImpl{}
for buf.Len() > 0 {
msg := message.MessageExtImpl{MessageImpl: &message.MessageImpl{}}
binary.Read(buf, binary.BigEndian, &storeSize)
binary.Read(buf, binary.BigEndian, &magicCode)
binary.Read(buf, binary.BigEndian, &bodyCRC)
binary.Read(buf, binary.BigEndian, &queueId)
binary.Read(buf, binary.BigEndian, &flag)
binary.Read(buf, binary.BigEndian, &queueOffset)
binary.Read(buf, binary.BigEndian, &physicOffset)
binary.Read(buf, binary.BigEndian, &sysFlag)
binary.Read(buf, binary.BigEndian, &bornTimeStamp)
bornHost = make([]byte, 4)
binary.Read(buf, binary.BigEndian, &bornHost)
binary.Read(buf, binary.BigEndian, &bornPort)
binary.Read(buf, binary.BigEndian, &storeTimestamp)
storeHost = make([]byte, 4)
binary.Read(buf, binary.BigEndian, &storeHost)
binary.Read(buf, binary.BigEndian, &storePort)
binary.Read(buf, binary.BigEndian, &reconsumeTimes)
binary.Read(buf, binary.BigEndian, &preparedTransactionOffset)
binary.Read(buf, binary.BigEndian, &bodyLength)
if bodyLength > 0 {
body = make([]byte, bodyLength)
binary.Read(buf, binary.BigEndian, body)
if (sysFlag & constant.CompressedFlag) == constant.CompressedFlag {
var err error
body, err = util.UnCompress(body)
if err != nil {
glog.Error(err)
return nil
}
}
}
binary.Read(buf, binary.BigEndian, &topicLen)
topic = make([]byte, int(topicLen))
binary.Read(buf, binary.BigEndian, &topic)
binary.Read(buf, binary.BigEndian, &propertiesLength)
if propertiesLength > 0 {
properties = make([]byte, propertiesLength)
binary.Read(buf, binary.BigEndian, &properties)
propertiesMap = util.String2MessageProperties(string(properties))
}
if magicCode != -626843481 {
glog.Errorf("magic code is error %d", magicCode)
return nil
}
msg.SetTopic(string(topic))
msg.QueueId = queueId
msg.SysFlag = sysFlag
msg.QueueOffset = queueOffset
msg.BodyCRC = bodyCRC
msg.StoreSize = storeSize
msg.BornTimestamp = bornTimeStamp
msg.ReconsumeTimes = reconsumeTimes
msg.SetFlag(int(flag))
msg.CommitLogOffset = physicOffset
msg.StoreTimestamp = storeTimestamp
msg.PreparedTransactionOffset = preparedTransactionOffset
msg.SetBody(body)
msg.SetProperties(propertiesMap)
// < 3.5.8 use messageOffsetId
// >= 3.5.8 use clientUniqMsgId
msg.SetMsgId(msg.GetMsgUniqueKey())
if len(msg.MsgId()) == 0 {
msg.SetMsgId(message.GeneratorMessageOffsetId(storeHost, storePort, msg.CommitLogOffset))
}
msgs = append(msgs, msg)
}
return msgs
}