blob: 245bbe43b97bc424d37b6b2946b5312cf55c44a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rocketmq
import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
"github.com/golang/glog"
"strings"
"time"
)
type Consumer interface {
RegisterMessageListener(listener model.MessageListener)
Subscribe(topic string, subExpression string)
}
type DefaultMQPushConsumer struct {
consumerGroup string
//consumeFromWhere string
consumeType string
messageModel string
unitMode bool
subscription map[string]string //topic|subExpression
subscriptionTag map[string][]string // we use it filter again
offsetStore service.OffsetStore
mqClient service.RocketMqClient
rebalance *service.Rebalance
pause bool //when reset offset we need pause
consumeMessageService service.ConsumeMessageService
ConsumerConfig *config.RocketMqConsumerConfig
}
func NewDefaultMQPushConsumer(consumerGroup string) (defaultMQPushConsumer *DefaultMQPushConsumer) {
defaultMQPushConsumer = &DefaultMQPushConsumer{
consumerGroup: consumerGroup,
//consumeFromWhere:"CONSUME_FROM_FIRST_OFFSET", //todo use config
consumeType: "CONSUME_PASSIVELY",
messageModel: "CLUSTERING",
pause: false}
defaultMQPushConsumer.subscription = make(map[string]string)
defaultMQPushConsumer.subscriptionTag = make(map[string][]string)
defaultMQPushConsumer.ConsumerConfig = config.NewRocketMqConsumerConfig()
return
}
func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
self.subscription[topic] = subExpression
if len(subExpression) == 0 || subExpression == "*" {
return
}
tags := strings.Split(subExpression, "||")
tagsList := []string{}
for _, tag := range tags {
t := strings.TrimSpace(tag)
if len(t) == 0 {
continue
}
tagsList = append(tagsList, t)
}
if len(tagsList) > 0 {
self.subscriptionTag[topic] = tagsList
}
}
func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
}
func (self *DefaultMQPushConsumer) resetOffset(offsetTable map[model.MessageQueue]int64) {
self.pause = true
glog.Info("now we ClearProcessQueue 0 ", offsetTable)
self.rebalance.ClearProcessQueue(offsetTable)
glog.Info("now we ClearProcessQueue", offsetTable)
go func() {
waitTime := time.NewTimer(10 * time.Second)
<-waitTime.C
defer func() {
self.pause = false
self.rebalance.DoRebalance()
}()
for messageQueue, offset := range offsetTable {
processQueue := self.rebalance.GetProcessQueue(messageQueue)
if processQueue == nil || offset < 0 {
continue
}
glog.Info("now we UpdateOffset", messageQueue, offset)
self.offsetStore.UpdateOffset(&messageQueue, offset, false)
self.rebalance.RemoveProcessQueue(&messageQueue)
}
}()
}
func (self *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData {
subscriptions := make([]*model.SubscriptionData, 0)
for _, subscription := range self.rebalance.SubscriptionInner {
subscriptions = append(subscriptions, subscription)
}
return subscriptions
}
func (self *DefaultMQPushConsumer) CleanExpireMsg() {
nowTime := int64(time.Now().UnixNano()) / 1000000 //will cause nowTime - consumeStartTime <0 ,but no matter
messageQueueList, processQueueList := self.rebalance.GetProcessQueueList()
for messageQueueIndex, processQueue := range processQueueList {
loop := processQueue.GetMsgCount()
if loop > 16 {
loop = 16
}
for i := 0; i < loop; i++ {
_, message := processQueue.GetMinMessageInTree()
if message == nil {
break
}
consumeStartTime := message.GetConsumeStartTime()
maxDiffTime := self.ConsumerConfig.ConsumeTimeout * 1000 * 60
//maxDiffTime := self.ConsumerConfig.ConsumeTimeout
glog.V(2).Info("look message.GetConsumeStartTime()", consumeStartTime)
glog.V(2).Infof("look diff %d %d", nowTime-consumeStartTime, maxDiffTime)
//if(nowTime - consumeStartTime <0){
// panic("nowTime - consumeStartTime <0")
//}
if nowTime-consumeStartTime < maxDiffTime {
break
}
glog.Info("look now we send expire message back", message.Topic, message.MsgId)
err := self.consumeMessageService.SendMessageBack(message, 3, messageQueueList[messageQueueIndex].BrokerName)
if err != nil {
glog.Error("op=send_expire_message_back_error", err)
continue
}
processQueue.DeleteExpireMsg(int(message.QueueOffset))
}
}
return
}