blob: aaf5849b83b4a446cb0d88e3a0041b284f714de2 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package model
import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/emirpasic/gods/maps/treemap"
"github.com/golang/glog"
"sync"
"time"
)
//ProcessQueue message process queue
type ProcessQueue struct {
msgTreeMap *treemap.Map // int | MessageExtImpl
msgCount int
lockTreeMap sync.RWMutex
locked bool
lastPullTimestamp time.Time
lastConsumeTimestamp time.Time
lastLockTimestamp time.Time
lockConsume sync.RWMutex
consuming bool
queueOffsetMax int64
dropped bool
msgAccCnt int64 //accumulation message count
tryUnlockTimes int64
msgTreeMapToBeConsume *treemap.Map
}
//NewProcessQueue create a ProcessQueue
func NewProcessQueue() (processQueue *ProcessQueue) {
processQueue = new(ProcessQueue)
processQueue.dropped = false
processQueue.msgTreeMap = treemap.NewWithIntComparator()
processQueue.msgTreeMapToBeConsume = treemap.NewWithIntComparator()
return
}
//GetMsgCount get message count
func (p *ProcessQueue) GetMsgCount() int {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
return p.msgCount
}
//Clear clear
func (p *ProcessQueue) Clear() {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
p.SetDrop(true)
p.msgTreeMap.Clear()
p.msgCount = 0
p.queueOffsetMax = 0
}
//ChangeToProcessQueueInfo changeToProcessQueueInfo
func (p *ProcessQueue) ChangeToProcessQueueInfo() (processQueueInfo ProcessQueueInfo) {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
processQueueInfo = ProcessQueueInfo{}
minOffset := -1
maxOffset := -1
minKey, _ := p.msgTreeMap.Min()
if minKey != nil {
minOffset = minKey.(int)
}
maxKey, _ := p.msgTreeMap.Max()
if maxKey != nil {
maxOffset = maxKey.(int)
}
processQueueInfo.CachedMsgCount = int32(p.msgCount)
processQueueInfo.CachedMsgMinOffset = int64(maxOffset)
processQueueInfo.CachedMsgMaxOffset = int64(minOffset)
processQueueInfo.Droped = p.dropped
processQueueInfo.LastConsumeTimestamp = p.lastConsumeTimestamp.UnixNano()
processQueueInfo.LastPullTimestamp = p.lastPullTimestamp.UnixNano()
return
}
//DeleteExpireMsg deleteExpireMsg
func (p *ProcessQueue) DeleteExpireMsg(queueOffset int) {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
key, _ := p.msgTreeMap.Min()
if key == nil {
return
}
offset := key.(int)
glog.V(2).Infof("look min key and offset %d %s", offset, queueOffset)
if queueOffset == offset {
p.msgTreeMap.Remove(queueOffset)
p.msgCount = p.msgTreeMap.Size()
}
}
//GetMinMessageInTree getMinMessageInTree
func (p *ProcessQueue) GetMinMessageInTree() (offset int, messagePoint *message.MessageExtImpl) {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
key, value := p.msgTreeMap.Min()
if key == nil || value == nil {
return
}
offset = key.(int)
message := value.(message.MessageExtImpl)
messagePoint = &message
return
}
//SetDrop set this queue is dropped
func (p *ProcessQueue) SetDrop(drop bool) {
p.dropped = drop
}
//IsDropped judge whether this queue is dropped
func (p *ProcessQueue) IsDropped() bool {
return p.dropped
}
//GetMaxSpan getMaxSpan
func (p *ProcessQueue) GetMaxSpan() int {
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
if p.msgTreeMap.Empty() {
return 0
}
minKey, _ := p.msgTreeMap.Min()
minOffset := minKey.(int)
maxKey, _ := p.msgTreeMap.Max()
maxOffset := maxKey.(int)
return maxOffset - minOffset
}
//RemoveMessage from this process queue
func (p *ProcessQueue) RemoveMessage(msgs []message.MessageExtImpl) (offset int64) {
now := time.Now()
offset = -1
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
p.lastConsumeTimestamp = now
if p.msgCount > 0 {
maxKey, _ := p.msgTreeMap.Max()
offset = int64(maxKey.(int)) + 1
for _, msg := range msgs {
p.msgTreeMap.Remove(int(msg.QueueOffset))
}
p.msgCount = p.msgTreeMap.Size()
if p.msgCount > 0 {
minKey, _ := p.msgTreeMap.Min()
offset = int64(minKey.(int))
}
}
return
}
//PutMessage put message into this process queue
func (p *ProcessQueue) PutMessage(msgs []message.MessageExtImpl) (dispatchToConsume bool) {
dispatchToConsume = false
msgsLen := len(msgs)
if msgsLen == 0 {
return
}
defer p.lockTreeMap.Unlock()
p.lockTreeMap.Lock()
for _, msg := range msgs {
p.msgTreeMap.Put(int(msg.QueueOffset), msg)
}
p.msgCount = p.msgTreeMap.Size()
maxOffset, _ := p.msgTreeMap.Max()
p.queueOffsetMax = int64(maxOffset.(int))
if p.msgCount > 0 && !p.consuming {
dispatchToConsume = true
p.consuming = true
}
lastMsg := msgs[msgsLen-1]
remoteMaxOffset := util.StrToInt64WithDefaultValue(lastMsg.PropertiesKeyValue(constant.PROPERTY_MAX_OFFSET), -1)
if remoteMaxOffset > 0 {
accTotal := remoteMaxOffset - lastMsg.QueueOffset
if accTotal > 0 {
p.msgAccCnt = accTotal
}
}
return
}