blob: e4a08df40294ed4ae75691cb507ee7452165dc3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package service
import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/golang/glog"
"time"
)
type ConsumeMessageService interface {
//ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue,
messageQueue *model.MessageQueue, dispathToConsume bool)
SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
}
type ConsumeMessageConcurrentlyServiceImpl struct {
consumerGroup string
messageListener model.MessageListener
sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
offsetStore OffsetStore
consumerConfig *config.RocketMqConsumerConfig
}
func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) {
consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener: messageListener, sendMessageBackProducerService: &SendMessageBackProducerServiceImpl{}}
return
}
func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
self.consumerGroup = consumerGroup
self.offsetStore = offsetStore
self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient, defaultProducerService, consumerConfig)
self.consumerConfig = consumerConfig
}
func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []model.MessageExt, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) {
msgsLen := len(msgs)
for i := 0; i < msgsLen; {
begin := i
end := i + self.consumerConfig.ConsumeMessageBatchMaxSize
if end > msgsLen {
end = msgsLen
}
go func() {
glog.V(2).Infof("look slice begin %d end %d msgsLen %d", begin, end, msgsLen)
batchMsgs := transformMessageToConsume(self.consumerGroup, msgs[begin:end])
consumeState := self.messageListener(batchMsgs)
self.processConsumeResult(consumeState, batchMsgs, messageQueue, processQueue)
}()
i = end
}
return
}
func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
return
}
func (self *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageExt *model.MessageExt, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
start := time.Now().UnixNano() / 1000000
consumeResult := self.messageListener([]model.MessageExt{*messageExt})
consumeMessageDirectlyResult.AutoCommit = true
consumeMessageDirectlyResult.Order = false
consumeMessageDirectlyResult.SpentTimeMills = time.Now().UnixNano()/1000000 - start
if consumeResult.ConsumeConcurrentlyStatus == "CONSUME_SUCCESS" && consumeResult.AckIndex >= 0 {
consumeMessageDirectlyResult.ConsumeResult = "CR_SUCCESS"
} else {
consumeMessageDirectlyResult.ConsumeResult = "CR_THROW_EXCEPTION"
}
return
}
func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result model.ConsumeConcurrentlyResult, msgs []model.MessageExt, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) {
if processQueue.IsDropped() {
glog.Warning("processQueue is dropped without process consume result. ", msgs)
return
}
if len(msgs) == 0 {
return
}
ackIndex := result.AckIndex
if model.CONSUME_SUCCESS == result.ConsumeConcurrentlyStatus {
if ackIndex >= len(msgs) {
ackIndex = len(msgs) - 1
} else {
if result.AckIndex < 0 {
ackIndex = -1
}
}
}
var failedMessages []model.MessageExt
successMessages := []model.MessageExt{}
if ackIndex >= 0 {
successMessages = msgs[:ackIndex+1]
}
for i := ackIndex + 1; i < len(msgs); i++ {
err := self.SendMessageBack(&msgs[i], 0, messageQueue.BrokerName)
if err != nil {
msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1
failedMessages = append(failedMessages, msgs[i])
} else {
successMessages = append(successMessages, msgs[i])
}
}
if len(failedMessages) > 0 {
self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
}
commitOffset := processQueue.RemoveMessage(successMessages)
if commitOffset > 0 && !processQueue.IsDropped() {
self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
}
}
func transformMessageToConsume(consumerGroup string, msgs []model.MessageExt) []model.MessageExt {
retryTopicName := constant.RETRY_GROUP_TOPIC_PREFIX + consumerGroup
for _, msg := range msgs {
//reset retry topic name
if msg.Message.Topic == retryTopicName {
retryTopic := msg.Properties[constant.PROPERTY_RETRY_TOPIC]
if len(retryTopic) > 0 {
msg.Message.Topic = retryTopic
}
}
//set consume start time
msg.SetConsumeStartTime()
}
return msgs
}