feat(consumer): advance code (#309)
- use atomic.Bool instead of bool
- be consistent with java
- use map[primitive.MessageQueue]bool instead of map[int]bool, 0
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 6d1bd53..13cef4e 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -297,7 +297,7 @@
dc.processQueueTable.Range(func(key, value interface{}) bool {
k := key.(primitive.MessageQueue)
pq := value.(*processQueue)
- pq.dropped = true
+ pq.WithDropped(true)
mqs = append(mqs, &k)
return true
})
@@ -465,7 +465,7 @@
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
- pq.locked = true
+ pq.WithLock(true)
pq.lastConsumeTime = time.Now()
pq.lastLockTime = time.Now()
}
@@ -522,24 +522,24 @@
MQs: mqs,
}
lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
- set := make(map[int]bool, 0)
+ set := make(map[primitive.MessageQueue]bool)
for idx := range lockedMQ {
_mq := lockedMQ[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
- pq.locked = true
+ pq.WithLock(true)
pq.lastConsumeTime = time.Now()
}
- set[_mq.HashCode()] = true
+ set[_mq] = true
}
for idx := range mqs {
_mq := mqs[idx]
- if !set[_mq.HashCode()] {
+ if !set[*_mq] {
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
- pq.locked = true
+ pq.WithLock(false)
pq.lastLockTime = time.Now()
rlog.Info("lock MessageQueue", map[string]interface{}{
"lockOK": false,
@@ -572,12 +572,12 @@
_mq := mqs[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
- v.(*processQueue).locked = false
rlog.Info("lock MessageQueue", map[string]interface{}{
"lockOK": false,
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: _mq.String(),
})
+ v.(*processQueue).WithLock(false)
}
}
}
@@ -597,6 +597,9 @@
lockOKMQSet := struct {
MQs []primitive.MessageQueue `json:"lockOKMQSet"`
}{}
+ if len(response.Body) == 0 {
+ return nil
+ }
err = json.Unmarshal(response.Body, &lockOKMQSet)
if err != nil {
rlog.Error("Unmarshal lock mq body error", map[string]interface{}{
@@ -660,9 +663,8 @@
pq := value.(*processQueue)
if mq.Topic == topic {
if !mqSet[mq] {
- pq.dropped = true
+ pq.WithDropped(true)
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
- //delete(mqSet, mq)
dc.processQueueTable.Delete(key)
changed = true
rlog.Debug("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{
@@ -671,9 +673,9 @@
})
}
} else if pq.isPullExpired() && dc.cType == _PushConsume {
- pq.dropped = true
+ pq.WithDropped(true)
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
- delete(mqSet, mq)
+ dc.processQueueTable.Delete(key)
changed = true
rlog.Debug("remove unnecessary mq because pull was paused, prepare to fix it", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index eeabbb2..13b2d25 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -23,11 +23,13 @@
"sync/atomic"
"time"
- "github.com/apache/rocketmq-client-go/primitive"
- "github.com/apache/rocketmq-client-go/rlog"
"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
gods_util "github.com/emirpasic/gods/utils"
+ uatomic "go.uber.org/atomic"
+
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/rlog"
)
const (
@@ -45,10 +47,10 @@
consumingMsgOrderlyTreeMap *treemap.Map
tryUnlockTimes int64
queueOffsetMax int64
- dropped bool
+ dropped *uatomic.Bool
lastPullTime time.Time
lastConsumeTime time.Time
- locked bool
+ locked *uatomic.Bool
lastLockTime time.Time
consuming bool
msgAccCnt int64
@@ -68,6 +70,8 @@
msgCh: make(chan []*primitive.MessageExt, 32),
consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
order: order,
+ locked: uatomic.NewBool(false),
+ dropped: uatomic.NewBool(false),
}
return pq
}
@@ -110,6 +114,22 @@
}
}
+func (pq *processQueue) WithLock(lock bool) {
+ pq.locked.Store(lock)
+}
+
+func (pq *processQueue) IsLock() bool {
+ return pq.locked.Load()
+}
+
+func (pq *processQueue) WithDropped(dropped bool) {
+ pq.dropped.Store(dropped)
+}
+
+func (pq *processQueue) IsDroppd() bool {
+ return pq.dropped.Load()
+}
+
func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
pq.mutex.Lock()
for _, msg := range messages {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index f1aa9a8..aa999c5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -406,7 +406,7 @@
default:
}
- if pq.dropped {
+ if pq.IsDroppd() {
rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
@@ -486,7 +486,7 @@
goto NEXT
}
} else {
- if pq.locked {
+ if pq.IsLock() {
if !request.lockedFirst {
offset := pc.computePullFromWhere(request.mq)
brokerBusy := offset < request.nextOffset
@@ -624,7 +624,7 @@
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
- pq.dropped = true
+ pq.WithDropped(true)
go func() {
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
@@ -716,7 +716,7 @@
mqHash := value.(int)
pq := value.(*processQueue)
if set[mqHash] != nil {
- pq.dropped = true
+ pq.WithDropped(true)
pq.clear()
}
return true
@@ -804,7 +804,7 @@
}
go func() {
RETRY:
- if pq.dropped {
+ if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
@@ -845,7 +845,7 @@
increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
- if !pq.dropped {
+ if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
@@ -870,7 +870,7 @@
offset := pq.removeMessage(subMsgs...)
- if offset >= 0 && !pq.dropped {
+ if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
@@ -889,7 +889,7 @@
}
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
- if pq.dropped {
+ if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
@@ -899,19 +899,19 @@
lock := pc.queueLock.fetchLock(*mq)
lock.Lock()
defer lock.Unlock()
- if pc.model == BroadCasting || (pq.locked && !pq.isLockExpired()) {
+ if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
beginTime := time.Now()
continueConsume := true
for continueConsume {
- if pq.dropped {
+ if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
break
}
if pc.model == Clustering {
- if !pq.locked {
+ if !pq.IsLock() {
rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
@@ -1018,12 +1018,12 @@
default:
}
}
- if commitOffset > 0 && !pq.dropped {
+ if commitOffset > 0 && !pq.IsDroppd() {
_ = pc.updateOffset(mq, commitOffset)
}
}
} else {
- if pq.dropped {
+ if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})