feat(client): add mutex for rebalance (#316)
- add rbMutex for client rebalance
- use atomic value to avoid concurrency
Closes #313
diff --git a/consumer/consumer.go b/consumer/consumer.go
index a10d3c5..103fcf1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -469,8 +469,8 @@
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
- pq.lastConsumeTime = time.Now()
- pq.lastLockTime = time.Now()
+ pq.UpdateLastConsumeTime()
+ pq.UpdateLastLockTime()
}
if _mq.Equals(mq) {
lockOK = true
@@ -532,7 +532,7 @@
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
- pq.lastConsumeTime = time.Now()
+ pq.UpdateLastConsumeTime()
}
set[_mq] = true
}
@@ -543,7 +543,7 @@
if exist {
pq := v.(*processQueue)
pq.WithLock(false)
- pq.lastLockTime = time.Now()
+ pq.UpdateLastLockTime()
rlog.Info("lock MessageQueue", map[string]interface{}{
"lockOK": false,
rlog.LogKeyConsumerGroup: dc.consumerGroup,
@@ -653,14 +653,12 @@
return result
}
-// TODO 问题不少 需要再好好对一下
func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
var changed bool
mqSet := make(map[primitive.MessageQueue]bool)
for idx := range mqs {
mqSet[*mqs[idx]] = true
}
- // TODO
dc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 4f4dc0c..3ab5b12 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -50,9 +50,9 @@
queueOffsetMax int64
dropped *uatomic.Bool
lastPullTime time.Time
- lastConsumeTime time.Time
+ lastConsumeTime atomic.Value
locked *uatomic.Bool
- lastLockTime time.Time
+ lastLockTime atomic.Value
consuming bool
msgAccCnt int64
lockConsume sync.Mutex
@@ -63,11 +63,17 @@
func newProcessQueue(order bool) *processQueue {
consumingMsgOrderlyTreeMap := treemap.NewWith(gods_util.Int64Comparator)
+ lastConsumeTime := atomic.Value{}
+ lastConsumeTime.Store(time.Now())
+
+ lastLockTime := atomic.Value{}
+ lastLockTime.Store(time.Now())
+
pq := &processQueue{
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: time.Now(),
- lastConsumeTime: time.Now(),
- lastLockTime: time.Now(),
+ lastConsumeTime: lastConsumeTime,
+ lastLockTime: lastLockTime,
msgCh: make(chan []*primitive.MessageExt, 32),
consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
order: order,
@@ -131,6 +137,22 @@
return pq.dropped.Load()
}
+func (pq *processQueue) UpdateLastConsumeTime() {
+ pq.lastConsumeTime.Store(time.Now())
+}
+
+func (pq *processQueue) LastConsumeTime() time.Time {
+ return pq.lastConsumeTime.Load().(time.Time)
+}
+
+func (pq *processQueue) UpdateLastLockTime() {
+ pq.lastLockTime.Store(time.Now())
+}
+
+func (pq *processQueue) LastLockTime() time.Time {
+ return pq.lastLockTime.Load().(time.Time)
+}
+
func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
pq.mutex.Lock()
for _, msg := range messages {
@@ -144,7 +166,7 @@
func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
result := int64(-1)
pq.mutex.Lock()
- pq.lastConsumeTime = time.Now()
+ pq.UpdateLastConsumeTime()
if !pq.msgCache.Empty() {
result = pq.queueOffsetMax + 1
removedCount := 0
@@ -169,7 +191,7 @@
}
func (pq *processQueue) isLockExpired() bool {
- return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
+ return time.Now().Sub(pq.LastLockTime()) > _RebalanceLockMaxTime
}
func (pq *processQueue) isPullExpired() bool {
@@ -332,10 +354,10 @@
info := internal.ProcessQueueInfo{
Locked: pq.locked.Load(),
TryUnlockTimes: pq.tryUnlockTimes,
- LastLockTimestamp: pq.lastLockTime.UnixNano() / 10e6,
+ LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
Dropped: pq.dropped.Load(),
- LastPullTimestamp: pq.lastPullTime.UnixNano() / 10e6,
- LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
+ LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
+ LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
}
if !pq.msgCache.Empty() {
diff --git a/internal/client.go b/internal/client.go
index d9c5f6a..c952385 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -164,6 +164,7 @@
remoteClient remote.RemotingClient
hbMutex sync.Mutex
close bool
+ rbMutex sync.Mutex
namesrvs *namesrvs
done chan struct{}
shutdownOnce sync.Once
@@ -618,6 +619,8 @@
}
func (c *rmqClient) RebalanceImmediately() {
+ c.rbMutex.Lock()
+ defer c.rbMutex.Unlock()
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
consumer.Rebalance()