blob: c600f136bc12cbc6393a969d1bfab0b081c02c98 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consumer
import (
"context"
"fmt"
"math"
"runtime/pprof"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
// In most scenarios, this is the mostly recommended usage to consume messages.
//
// Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
// arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
//
// See quick start/Consumer in the example module for a typical usage.
//
// <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
const (
Mb = 1024 * 1024
)
type PushConsumerCallback struct {
topic string
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)
}
func (callback PushConsumerCallback) UniqueID() string {
return callback.topic
}
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consumeFunc utils.Set
submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
crCh map[string]chan struct{}
}
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
defaultOpts := defaultPushConsumerOptions()
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
if !defaultOpts.Credentials.IsEmpty() {
srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
if defaultOpts.Namespace != "" {
defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
}
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
option: defaultOpts,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
crCh: make(map[string]chan struct{}),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
p.submitToConsume = p.consumeMessageOrderly
} else {
p.submitToConsume = p.consumeMessageConcurrently
}
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
return p, nil
}
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
rlog.Info("the consumer start beginning", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = errors2.ErrCreated
return
}
err = pc.defaultConsumer.start()
if err != nil {
return
}
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
pc.crCh[retryTopic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
go func() {
// todo start clean msg expired
for {
select {
case pr := <-pc.prCh:
go func() {
pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}()
go primitive.WithRecover(func() {
if pc.consumeOrderly {
return
}
time.Sleep(pc.option.ConsumeTimeout)
pc.cleanExpiredMsg()
ticker := time.NewTicker(pc.option.ConsumeTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pc.cleanExpiredMsg()
case <-pc.done:
rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
})
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {
return
}
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
defer lockTicker.Stop()
for {
select {
case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
})
})
if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
if !exist {
pc.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
go pc.client.RebalanceImmediately()
return err
}
func (pc *pushConsumer) GetOffsetDiffMap() map[string]int64 {
offsetDiffMap := make(map[string]int64)
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
topic := mq.Topic
consumerOffset, _ := pc.storage.readWithException(&mq, _ReadFromMemory)
maxOffset := pq.maxOffsetInQueue
if consumerOffset < 0 || maxOffset < 0 || consumerOffset > maxOffset {
return true
}
if _, ok := offsetDiffMap[topic]; !ok {
offsetDiffMap[topic] = 0
}
offsetDiff := offsetDiffMap[topic]
offsetDiffMap[topic] = offsetDiff + (maxOffset - consumerOffset)
return true
})
return offsetDiffMap
}
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
if pc.option.TraceDispatcher != nil {
pc.option.TraceDispatcher.Close()
}
close(pc.done)
if pc.consumeOrderly && pc.model == Clustering {
pc.unlockAll(false)
}
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})
return err
}
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
if _, ok := pc.crCh[topic]; !ok {
pc.crCh[topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
pc.consumeFunc.Add(&PushConsumerCallback{
f: f,
topic: topic,
})
return nil
}
func (pc *pushConsumer) Unsubscribe(topic string) error {
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
pc.subscriptionDataTable.Delete(topic)
return nil
}
func (pc *pushConsumer) Suspend() {
pc.suspend()
}
func (pc *pushConsumer) Resume() {
pc.resume()
}
func (pc *pushConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
}
func (pc *pushConsumer) RebalanceIfNotPaused() {
pc.defaultConsumer.doBalanceIfNotPaused()
}
func (pc *pushConsumer) PersistConsumerOffset() error {
return pc.defaultConsumer.persistConsumerOffset()
}
func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
}
func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
}
func (pc *pushConsumer) SubscriptionDataList() []*internal.SubscriptionData {
return pc.defaultConsumer.SubscriptionDataList()
}
func (pc *pushConsumer) IsUnitMode() bool {
return pc.unitMode
}
func (pc *pushConsumer) GetcType() string {
return string(pc.cType)
}
func (pc *pushConsumer) GetModel() string {
return pc.model.String()
}
func (pc *pushConsumer) GetWhere() string {
switch pc.fromWhere {
case ConsumeFromLastOffset:
return "CONSUME_FROM_LAST_OFFSET"
case ConsumeFromFirstOffset:
return "CONSUME_FROM_FIRST_OFFSET"
case ConsumeFromTimestamp:
return "CONSUME_FROM_TIMESTAMP"
default:
return "UNKNOWN"
}
}
func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
var msgs = []*primitive.MessageExt{msg}
var mq = &primitive.MessageQueue{
Topic: msg.Topic,
BrokerName: brokerName,
QueueId: msg.Queue.QueueId,
}
beginTime := time.Now()
pc.resetRetryAndNamespace(msgs)
var result ConsumeResult
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, msgs)
consumeRT := time.Now().Sub(beginTime)
res := &internal.ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: int64(consumeRT / time.Millisecond),
}
if err != nil {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
res.ConsumeResult = internal.ThrowException
res.Remark = err.Error()
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
res.ConsumeResult = internal.ConsumeSuccess
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
res.ConsumeResult = internal.ConsumeRetryLater
}
pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
return res
}
func (pc *pushConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus {
consumerStatus := internal.NewConsumerStatus()
mqOffsetMap := pc.storage.getMQOffsetMap(topic)
if mqOffsetMap != nil {
consumerStatus.MQOffsetMap = mqOffsetMap
}
return consumerStatus
}
func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()
pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
info.SubscriptionData[value.(*internal.SubscriptionData)] = true
status := internal.ConsumeStatus{
PullRT: pc.stat.getPullRT(pc.consumerGroup, topic).avgpt,
PullTPS: pc.stat.getPullTPS(pc.consumerGroup, topic).tps,
ConsumeRT: pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt,
ConsumeOKTPS: pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps,
ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps,
ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
}
info.StatusTable[topic] = status
return true
})
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pInfo := pq.currentInfo()
pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore)
info.MQTable[mq] = pInfo
return true
})
if stack {
var buffer strings.Builder
err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
if err != nil {
rlog.Error("error when get stack ", map[string]interface{}{
"error": err,
})
} else {
info.JStack = buffer.String()
}
}
nsAddr := ""
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
info.Properties[internal.PropConsumeType] = string(pc.cType)
info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
info.Properties[internal.PropThreadPoolCoreSize] = "-1"
info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
return info
}
func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
v, exit := pc.subscriptionDataTable.Load(topic)
if !exit {
return
}
data := v.(*internal.SubscriptionData)
newVersion := time.Now().UnixNano()
rlog.Info("the MessageQueue changed, version also updated", map[string]interface{}{
rlog.LogKeyValueChangedFrom: data.SubVersion,
rlog.LogKeyValueChangedTo: newVersion,
})
data.Lock()
data.SubVersion = newVersion
data.Unlock()
// TODO: optimize
count := 0
pc.processQueueTable.Range(func(key, value interface{}) bool {
count++
return true
})
if count > 0 {
if pc.option.PullThresholdForTopic != -1 {
newVal := pc.option.PullThresholdForTopic / count
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic,
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdForTopic = newVal
}
if pc.option.PullThresholdSizeForTopic != -1 {
newVal := pc.option.PullThresholdSizeForTopic / count
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic,
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdSizeForTopic = newVal
}
}
pc.client.SendHeartbeatToAllBrokerWithLock()
}
func (pc *pushConsumer) validate() error {
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
return err
}
if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}
if len(pc.subscribedTopic) == 0 {
rlog.Warning("not subscribe any topic yet", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
return errors.New("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
}
}
if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
return errors.New("option.PullThresholdForQueue out of range [1, 65535]")
}
}
if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
return errors.New("option.PullThresholdForTopic out of range [1, 6553500]")
}
}
if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
}
if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
return errors.New("option.PullThresholdSizeForTopic out of range [1, 102400]")
}
}
if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond {
return errors.New("option.PullInterval out of range [0, 65535]")
}
if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 1
} else {
return errors.New("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
}
}
if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 {
if pullBatchSize == 0 {
pc.option.PullBatchSize.Store(32)
} else {
return errors.New("option.PullBatchSize out of range [1, 1024]")
}
}
if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 100000 {
if pc.option.ConsumeGoroutineNums == 0 {
pc.option.ConsumeGoroutineNums = 20
} else {
return errors.New("option.ConsumeGoroutineNums out of range [1, 100000]")
}
}
return nil
}
func (pc *pushConsumer) pullMessage(request *PullRequest) {
rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
var sleepTime time.Duration
pq := request.pq
go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
rlog.Info("push consumer close pullMessage.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
pc.submitToConsume(request.pq, request.mq)
if request.pq.IsDroppd() {
rlog.Info("push consumer quit pullMessage for dropped queue.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}
})
for {
NEXT:
select {
case <-pc.done:
rlog.Info("push consumer close message handle.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
}
if pq.IsDroppd() {
rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
return
}
if sleepTime > 0 {
rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
rlog.Warning("consumer state error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {
rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if !pc.consumeOrderly {
if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{
"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"maxSpan": pq.getMaxSpan(),
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueMaxSpanFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
} else {
if pq.IsLock() {
if !request.lockedFirst {
offset, err := pc.computePullFromWhereWithException(request.mq)
if err != nil {
rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
brokerBusy := offset < request.nextOffset
rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
rlog.LogKeyValueChangedFrom: request.nextOffset,
rlog.LogKeyValueChangedTo: offset,
"brokerBusy": brokerBusy,
})
if brokerBusy {
rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+
"broker consume offset", map[string]interface{}{"offset": offset})
}
request.lockedFirst = true
request.nextOffset = offset
}
} else {
rlog.Info("pull message later because not locked in broker", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
}
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {
rlog.Info("find the consumer's subscription failed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
beginTime := time.Now()
var (
commitOffsetEnable bool
commitOffsetValue int64
subExpression string
)
if pc.model == Clustering {
commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {
commitOffsetEnable = true
}
}
sd := v.(*internal.SubscriptionData)
classFilter := sd.ClassFilterMode
if pc.option.PostSubscriptionWhenPull && !classFilter {
subExpression = sd.SubString
}
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: subExpression,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
}
//
//if data.ExpType == string(TAG) {
// pullRequest.SubVersion = 0
//} else {
// pullRequest.SubVersion = data.SubVersion
//}
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
if brokerResult == nil {
rlog.Warning("no broker found for mq", map[string]interface{}{
rlog.LogKeyPullRequest: request.mq.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if brokerResult.Slave {
pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
}
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warning("pull message from broker error", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == primitive.PullBrokerTimeout {
rlog.Warning("pull broker timeout", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
pc.processPullResult(request.mq, result, sd)
if result.MaxOffset > pq.maxOffsetInQueue {
pq.maxOffsetInQueue = result.MaxOffset
}
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("the pull request offset illegal", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(*request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
}
}
func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
if pr.pq.cachedMsgCount.Load() <= 0 {
pc.storage.update(pr.mq, pr.nextOffset, true)
}
}
func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {
var brokerAddr string
if len(brokerName) != 0 {
brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
} else {
brokerAddr = msg.StoreHost
}
resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil || resp.Code != internal.ResSuccess {
// send back as a normal message
return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes())
}
return true
}
func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand {
req := &internal.ConsumerSendMsgBackRequestHeader{
Group: pc.consumerGroup,
OriginTopic: msg.Topic,
Offset: msg.CommitLogOffset,
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
}
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
}
func (pc *pushConsumer) suspend() {
pc.pause = true
rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
}
func (pc *pushConsumer) resume() {
pc.pause = false
pc.doBalance()
rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
}
func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
// rlog.Warning("received reset offset command from: %s, but missing params.", from)
// return
//}
//t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
//if err != nil {
// rlog.Warning("received reset offset command from: %s, but parse time error: %s", err.Error())
// return
//}
//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
// from, topic, group, t)
//
//offsetTable := make(map[MessageQueue]int64, 0)
//err = json.Unmarshal(cmd.Body, &offsetTable)
//if err != nil {
// rlog.Warning("received reset offset command from: %s, but parse offset table: %s", err.Error())
// return
//}
//v, exist := c.consumerMap.Load(group)
//if !exist {
// rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
// return
//}
pc.suspend()
defer pc.resume()
mqs := make([]*primitive.MessageQueue, 0)
copyPc := sync.Map{}
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
if _, ok := table[mq]; ok && mq.Topic == topic {
pq.WithDropped(true)
pq.clear()
}
mqs = append(mqs, &mq)
copyPc.Store(&mq, pq)
return true
})
time.Sleep(10 * time.Second)
for _, mq := range mqs {
if _, ok := table[*mq]; ok {
pc.storage.update(mq, table[*mq], false)
v, exist := copyPc.Load(mq)
if !exist {
continue
}
pq := v.(*processQueue)
pc.removeUnnecessaryMessageQueue(mq, pq)
pc.processQueueTable.Delete(*mq)
}
}
}
func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool {
pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
if !pc.consumeOrderly || Clustering != pc.model {
return true
}
// TODO orderly
return true
}
func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
if len(subMsgs) == 0 {
return ConsumeRetryLater, errors.New("msg list empty")
}
f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
// fix lost retry message
if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
}
if !exist {
return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
}
callback, ok := f.(*PushConsumerCallback)
if !ok {
return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
}
if pc.interceptor == nil {
return callback.f(ctx, subMsgs...)
} else {
var container ConsumeResultHolder
err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
msgs := req.([]*primitive.MessageExt)
r, e := callback.f(ctx, msgs...)
realReply := reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
msgCtx, _ := primitive.GetConsumerCtx(ctx)
msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
if realReply.ConsumeResult == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
return e
})
return container.ConsumeResult, err
}
}
// resetRetryAndNamespace modify retry message.
func (pc *pushConsumer) resetRetryAndNamespace(subMsgs []*primitive.MessageExt) {
groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
beginTime := time.Now()
for idx := range subMsgs {
msg := subMsgs[idx]
retryTopic := msg.GetProperty(primitive.PropertyRetryTopic)
if retryTopic != "" && groupTopic == msg.Topic {
msg.Topic = retryTopic
}
subMsgs[idx].WithProperty(primitive.PropertyConsumeStartTime, strconv.FormatInt(
beginTime.UnixNano()/int64(time.Millisecond), 10))
}
}
func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
limiter := pc.option.Limiter
limiterOn := limiter != nil
if _, ok := pc.crCh[mq.Topic]; !ok {
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
} else {
next := count + pc.option.ConsumeMessageBatchMaxSize
subMsgs = msgs[count:next]
count = next - 1
}
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
}
pc.crCh[mq.Topic] <- struct{}{}
go primitive.WithRecover(func() {
defer func() {
if err := recover(); err != nil {
rlog.Error("consumeMessageConcurrently panic", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
<-pc.crCh[mq.Topic]
}()
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
beginTime := time.Now()
pc.resetRetryAndNamespace(subMsgs)
var result ConsumeResult
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: subMsgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, subMsgs)
consumeRT := time.Now().Sub(beginTime)
if err != nil {
rlog.Warning("consumeMessageCurrently error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
} else if consumeRT >= pc.option.ConsumeTimeout {
rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
msgBackSucceed = subMsgs
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
for i := 0; i < len(subMsgs); i++ {
msg := subMsgs[i]
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msgBackSucceed = append(msgBackSucceed, msg)
} else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(msgBackSucceed...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
subMsgs = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"message": subMsgs,
})
}
})
}
}
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
return
}
lock := pc.queueLock.fetchLock(*mq)
lock.Lock()
defer lock.Unlock()
if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
beginTime := time.Now()
continueConsume := true
for continueConsume {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
break
}
if pc.model == Clustering {
if !pq.IsLock() {
rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
if pq.isLockExpired() {
rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
}
interval := time.Now().Sub(beginTime)
if interval > pc.option.MaxTimeConsumeContinuously {
time.Sleep(10 * time.Millisecond)
return
}
batchSize := pc.option.ConsumeMessageBatchMaxSize
msgs := pq.takeMessages(batchSize)
pc.resetRetryAndNamespace(msgs)
if len(msgs) == 0 {
continueConsume = false
break
}
// TODO: add message consumer hook
beginTime = time.Now()
ctx := context.Background()
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
orderlyCtx := primitive.NewConsumeOrderlyContext()
orderlyCtx.MQ = *mq
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
pq.lockConsume.Lock()
result, err := pc.consumeInner(ctx, msgs)
if err != nil {
rlog.Warning("consumeMessage orderly error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
pq.lockConsume.Unlock()
if result == Rollback || result == SuspendCurrentQueueAMoment {
rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messages": msgs,
rlog.LogKeyMessageQueue: mq,
})
}
// just put consumeResult in consumerMessageCtx
//interval = time.Now().Sub(beginTime)
//consumeReult := SuccessReturn
//if interval > pc.option.ConsumeTimeout {
// consumeReult = TimeoutReturn
//} else if SuspendCurrentQueueAMoment == result {
// consumeReult = FailedReturn
//} else if ConsumeSuccess == result {
// consumeReult = SuccessReturn
//}
// process result
commitOffset := int64(-1)
if pc.option.AutoCommit {
switch result {
case Commit, Rollback:
rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
})
case ConsumeSuccess:
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
pq.makeMessageToCosumeAgain(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
} else {
commitOffset = pq.commit()
}
default:
}
} else {
switch result {
case ConsumeSuccess:
case Commit:
commitOffset = pq.commit()
case Rollback:
// pq.rollback
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
}
default:
}
}
if commitOffset > 0 && !pq.IsDroppd() {
_ = pc.updateOffset(mq, commitOffset)
}
}
} else {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
}
pc.tryLockLaterAndReconsume(mq, 100)
}
}
func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool {
suspend := false
if len(msgs) != 0 {
maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
for _, msg := range msgs {
if msg.ReconsumeTimes > maxReconsumeTimes {
rlog.Warning(fmt.Sprintf("msg will be send to retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil)
msg.WithProperty("RECONSUME_TIME", strconv.Itoa(int(msg.ReconsumeTimes)))
if !pc.sendMessageBack("", msg, -1) {
suspend = true
msg.ReconsumeTimes += 1
}
} else {
suspend = true
msg.ReconsumeTimes += 1
}
}
}
return suspend
}
func (pc *pushConsumer) getOrderlyMaxReconsumeTimes() int32 {
if pc.option.MaxReconsumeTimes == -1 {
return math.MaxInt32
} else {
return pc.option.MaxReconsumeTimes
}
}
func (pc *pushConsumer) getMaxReconsumeTimes() int32 {
if pc.option.MaxReconsumeTimes == -1 {
return 16
} else {
return pc.option.MaxReconsumeTimes
}
}
func (pc *pushConsumer) tryLockLaterAndReconsume(mq *primitive.MessageQueue, delay int64) {
time.Sleep(time.Duration(delay) * time.Millisecond)
if pc.lock(mq) == true {
pc.submitConsumeRequestLater(10)
} else {
pc.submitConsumeRequestLater(3000)
}
}
func (pc *pushConsumer) submitConsumeRequestLater(suspendTimeMillis int64) {
if suspendTimeMillis == -1 {
suspendTimeMillis = int64(pc.option.SuspendCurrentQueueTimeMillis / time.Millisecond)
}
if suspendTimeMillis < 10 {
suspendTimeMillis = 10
} else if suspendTimeMillis > 30000 {
suspendTimeMillis = 30000
}
time.Sleep(time.Duration(suspendTimeMillis) * time.Millisecond)
}
func (pc *pushConsumer) cleanExpiredMsg() {
pc.processQueueTable.Range(func(key, value interface{}) bool {
pq := value.(*processQueue)
pq.cleanExpiredMsg(pc)
return true
})
}