[ISSUE #568] Update lastPullTime use atomic.Value as same with lastConsumeTime and lastLockTime (#613)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 19e831b..a306470 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -50,7 +50,7 @@
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
dropped *uatomic.Bool
- lastPullTime time.Time
+ lastPullTime atomic.Value
lastConsumeTime atomic.Value
locked *uatomic.Bool
lastLockTime atomic.Value
@@ -69,9 +69,12 @@
lastLockTime := atomic.Value{}
lastLockTime.Store(time.Now())
+ lastPullTime := atomic.Value{}
+ lastPullTime.Store(time.Now())
+
pq := &processQueue{
msgCache: treemap.NewWith(utils.Int64Comparator),
- lastPullTime: time.Now(),
+ lastPullTime: lastPullTime,
lastConsumeTime: lastConsumeTime,
lastLockTime: lastLockTime,
msgCh: make(chan []*primitive.MessageExt, 32),
@@ -157,6 +160,14 @@
return pq.lastLockTime.Load().(time.Time)
}
+func (pq *processQueue) LastPullTime() time.Time {
+ return pq.lastPullTime.Load().(time.Time)
+}
+
+func (pq *processQueue) UpdateLastPullTime() {
+ pq.lastPullTime.Store(time.Now())
+}
+
func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
pq.mutex.Lock()
for _, msg := range messages {
@@ -199,7 +210,7 @@
}
func (pq *processQueue) isPullExpired() bool {
- return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+ return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}
func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
@@ -360,7 +371,7 @@
TryUnlockTimes: pq.tryUnlockTimes,
LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
Dropped: pq.dropped.Load(),
- LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
+ LastPullTimestamp: pq.LastPullTime().UnixNano() / int64(time.Millisecond),
LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 393a0e4..59bfd12 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -562,7 +562,7 @@
}
// reset time
sleepTime = pc.option.PullInterval
- pq.lastPullTime = time.Now()
+ pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
rlog.Warning("consumer state error", map[string]interface{}{