[ISSUE #568] Update lastPullTime use atomic.Value as same with lastConsumeTime and lastLockTime (#613)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 19e831b..a306470 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -50,7 +50,7 @@
 	consumeLock                sync.Mutex
 	consumingMsgOrderlyTreeMap *treemap.Map
 	dropped                    *uatomic.Bool
-	lastPullTime               time.Time
+	lastPullTime               atomic.Value
 	lastConsumeTime            atomic.Value
 	locked                     *uatomic.Bool
 	lastLockTime               atomic.Value
@@ -69,9 +69,12 @@
 	lastLockTime := atomic.Value{}
 	lastLockTime.Store(time.Now())
 
+	lastPullTime := atomic.Value{}
+	lastPullTime.Store(time.Now())
+
 	pq := &processQueue{
 		msgCache:                   treemap.NewWith(utils.Int64Comparator),
-		lastPullTime:               time.Now(),
+		lastPullTime:               lastPullTime,
 		lastConsumeTime:            lastConsumeTime,
 		lastLockTime:               lastLockTime,
 		msgCh:                      make(chan []*primitive.MessageExt, 32),
@@ -157,6 +160,14 @@
 	return pq.lastLockTime.Load().(time.Time)
 }
 
+func (pq *processQueue) LastPullTime() time.Time {
+	return pq.lastPullTime.Load().(time.Time)
+}
+
+func (pq *processQueue) UpdateLastPullTime() {
+	pq.lastPullTime.Store(time.Now())
+}
+
 func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
 	pq.mutex.Lock()
 	for _, msg := range messages {
@@ -199,7 +210,7 @@
 }
 
 func (pq *processQueue) isPullExpired() bool {
-	return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+	return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
 }
 
 func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
@@ -360,7 +371,7 @@
 		TryUnlockTimes:       pq.tryUnlockTimes,
 		LastLockTimestamp:    pq.LastLockTime().UnixNano() / int64(time.Millisecond),
 		Dropped:              pq.dropped.Load(),
-		LastPullTimestamp:    pq.lastPullTime.UnixNano() / int64(time.Millisecond),
+		LastPullTimestamp:    pq.LastPullTime().UnixNano() / int64(time.Millisecond),
 		LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
 	}
 
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 393a0e4..59bfd12 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -562,7 +562,7 @@
 		}
 		// reset time
 		sleepTime = pc.option.PullInterval
-		pq.lastPullTime = time.Now()
+		pq.lastPullTime.Store(time.Now())
 		err := pc.makeSureStateOK()
 		if err != nil {
 			rlog.Warning("consumer state error", map[string]interface{}{