|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | * contributor license agreements.  See the NOTICE file distributed with | 
|  | * this work for additional information regarding copyright ownership. | 
|  | * The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | * (the "License"); you may not use this file except in compliance with | 
|  | * the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | *  Unless required by applicable law or agreed to in writing, software | 
|  | *  distributed under the License is distributed on an "AS IS" BASIS, | 
|  | *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | *  See the License for the specific language governing permissions and | 
|  | *  limitations under the License. | 
|  | */ | 
|  | package rocketmq | 
|  |  | 
|  | import ( | 
|  | "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" | 
|  | "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" | 
|  | "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service" | 
|  | "github.com/golang/glog" | 
|  | "strings" | 
|  | "time" | 
|  | ) | 
|  |  | 
|  | type Consumer interface { | 
|  | RegisterMessageListener(listener model.MessageListener) | 
|  | Subscribe(topic string, subExpression string) | 
|  | } | 
|  |  | 
|  | type DefaultMQPushConsumer struct { | 
|  | consumerGroup string | 
|  | //consumeFromWhere      string | 
|  | consumeType  string | 
|  | messageModel string | 
|  | unitMode     bool | 
|  |  | 
|  | subscription          map[string]string   //topic|subExpression | 
|  | subscriptionTag       map[string][]string // we use it filter again | 
|  | offsetStore           service.OffsetStore | 
|  | mqClient              service.RocketMqClient | 
|  | rebalance             *service.Rebalance | 
|  | pause                 bool //when reset offset we need pause | 
|  | consumeMessageService service.ConsumeMessageService | 
|  | ConsumerConfig        *config.RocketMqConsumerConfig | 
|  | } | 
|  |  | 
|  | func NewDefaultMQPushConsumer(consumerGroup string) (defaultMQPushConsumer *DefaultMQPushConsumer) { | 
|  | defaultMQPushConsumer = &DefaultMQPushConsumer{ | 
|  | consumerGroup: consumerGroup, | 
|  | //consumeFromWhere:"CONSUME_FROM_FIRST_OFFSET", //todo  use config | 
|  | consumeType:  "CONSUME_PASSIVELY", | 
|  | messageModel: "CLUSTERING", | 
|  | pause:        false} | 
|  | defaultMQPushConsumer.subscription = make(map[string]string) | 
|  | defaultMQPushConsumer.subscriptionTag = make(map[string][]string) | 
|  | defaultMQPushConsumer.ConsumerConfig = config.NewRocketMqConsumerConfig() | 
|  | return | 
|  | } | 
|  | func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) { | 
|  | self.subscription[topic] = subExpression | 
|  | if len(subExpression) == 0 || subExpression == "*" { | 
|  | return | 
|  | } | 
|  | tags := strings.Split(subExpression, "||") | 
|  | tagsList := []string{} | 
|  | for _, tag := range tags { | 
|  | t := strings.TrimSpace(tag) | 
|  | if len(t) == 0 { | 
|  | continue | 
|  | } | 
|  | tagsList = append(tagsList, t) | 
|  | } | 
|  | if len(tagsList) > 0 { | 
|  | self.subscriptionTag[topic] = tagsList | 
|  | } | 
|  | } | 
|  |  | 
|  | func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) { | 
|  | self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener) | 
|  | } | 
|  |  | 
|  | func (self *DefaultMQPushConsumer) resetOffset(offsetTable map[model.MessageQueue]int64) { | 
|  | self.pause = true | 
|  | glog.Info("now we ClearProcessQueue 0 ", offsetTable) | 
|  |  | 
|  | self.rebalance.ClearProcessQueue(offsetTable) | 
|  | glog.Info("now we ClearProcessQueue", offsetTable) | 
|  | go func() { | 
|  | waitTime := time.NewTimer(10 * time.Second) | 
|  | <-waitTime.C | 
|  | defer func() { | 
|  | self.pause = false | 
|  | self.rebalance.DoRebalance() | 
|  | }() | 
|  |  | 
|  | for messageQueue, offset := range offsetTable { | 
|  | processQueue := self.rebalance.GetProcessQueue(messageQueue) | 
|  | if processQueue == nil || offset < 0 { | 
|  | continue | 
|  | } | 
|  | glog.Info("now we UpdateOffset", messageQueue, offset) | 
|  | self.offsetStore.UpdateOffset(&messageQueue, offset, false) | 
|  | self.rebalance.RemoveProcessQueue(&messageQueue) | 
|  | } | 
|  | }() | 
|  | } | 
|  |  | 
|  | func (self *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData { | 
|  | subscriptions := make([]*model.SubscriptionData, 0) | 
|  | for _, subscription := range self.rebalance.SubscriptionInner { | 
|  | subscriptions = append(subscriptions, subscription) | 
|  | } | 
|  | return subscriptions | 
|  | } | 
|  |  | 
|  | func (self *DefaultMQPushConsumer) CleanExpireMsg() { | 
|  | nowTime := int64(time.Now().UnixNano()) / 1000000 //will cause nowTime - consumeStartTime <0 ,but no matter | 
|  | messageQueueList, processQueueList := self.rebalance.GetProcessQueueList() | 
|  | for messageQueueIndex, processQueue := range processQueueList { | 
|  | loop := processQueue.GetMsgCount() | 
|  | if loop > 16 { | 
|  | loop = 16 | 
|  | } | 
|  | for i := 0; i < loop; i++ { | 
|  | _, message := processQueue.GetMinMessageInTree() | 
|  | if message == nil { | 
|  | break | 
|  | } | 
|  | consumeStartTime := message.GetConsumeStartTime() | 
|  | maxDiffTime := self.ConsumerConfig.ConsumeTimeout * 1000 * 60 | 
|  | //maxDiffTime := self.ConsumerConfig.ConsumeTimeout | 
|  | glog.V(2).Info("look message.GetConsumeStartTime()", consumeStartTime) | 
|  | glog.V(2).Infof("look diff %d  %d", nowTime-consumeStartTime, maxDiffTime) | 
|  | //if(nowTime - consumeStartTime <0){ | 
|  | //	panic("nowTime - consumeStartTime <0") | 
|  | //} | 
|  | if nowTime-consumeStartTime < maxDiffTime { | 
|  | break | 
|  | } | 
|  | glog.Info("look now we send expire message back", message.Topic, message.MsgId) | 
|  | err := self.consumeMessageService.SendMessageBack(message, 3, messageQueueList[messageQueueIndex].BrokerName) | 
|  | if err != nil { | 
|  | glog.Error("op=send_expire_message_back_error", err) | 
|  | continue | 
|  | } | 
|  | processQueue.DeleteExpireMsg(int(message.QueueOffset)) | 
|  | } | 
|  | } | 
|  | return | 
|  | } |