feat(client): add mutex for rebalance (#316)

- add rbMutex for client rebalance
- use atomic value to avoid concurrency

Closes #313
diff --git a/consumer/consumer.go b/consumer/consumer.go
index a10d3c5..103fcf1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -469,8 +469,8 @@
 		if exist {
 			pq := v.(*processQueue)
 			pq.WithLock(true)
-			pq.lastConsumeTime = time.Now()
-			pq.lastLockTime = time.Now()
+			pq.UpdateLastConsumeTime()
+			pq.UpdateLastLockTime()
 		}
 		if _mq.Equals(mq) {
 			lockOK = true
@@ -532,7 +532,7 @@
 			if exist {
 				pq := v.(*processQueue)
 				pq.WithLock(true)
-				pq.lastConsumeTime = time.Now()
+				pq.UpdateLastConsumeTime()
 			}
 			set[_mq] = true
 		}
@@ -543,7 +543,7 @@
 				if exist {
 					pq := v.(*processQueue)
 					pq.WithLock(false)
-					pq.lastLockTime = time.Now()
+					pq.UpdateLastLockTime()
 					rlog.Info("lock MessageQueue", map[string]interface{}{
 						"lockOK":                 false,
 						rlog.LogKeyConsumerGroup: dc.consumerGroup,
@@ -653,14 +653,12 @@
 	return result
 }
 
-// TODO 问题不少 需要再好好对一下
 func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
 	var changed bool
 	mqSet := make(map[primitive.MessageQueue]bool)
 	for idx := range mqs {
 		mqSet[*mqs[idx]] = true
 	}
-	// TODO
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
 		mq := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 4f4dc0c..3ab5b12 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -50,9 +50,9 @@
 	queueOffsetMax             int64
 	dropped                    *uatomic.Bool
 	lastPullTime               time.Time
-	lastConsumeTime            time.Time
+	lastConsumeTime            atomic.Value
 	locked                     *uatomic.Bool
-	lastLockTime               time.Time
+	lastLockTime               atomic.Value
 	consuming                  bool
 	msgAccCnt                  int64
 	lockConsume                sync.Mutex
@@ -63,11 +63,17 @@
 func newProcessQueue(order bool) *processQueue {
 	consumingMsgOrderlyTreeMap := treemap.NewWith(gods_util.Int64Comparator)
 
+	lastConsumeTime := atomic.Value{}
+	lastConsumeTime.Store(time.Now())
+
+	lastLockTime := atomic.Value{}
+	lastLockTime.Store(time.Now())
+
 	pq := &processQueue{
 		msgCache:                   treemap.NewWith(utils.Int64Comparator),
 		lastPullTime:               time.Now(),
-		lastConsumeTime:            time.Now(),
-		lastLockTime:               time.Now(),
+		lastConsumeTime:            lastConsumeTime,
+		lastLockTime:               lastLockTime,
 		msgCh:                      make(chan []*primitive.MessageExt, 32),
 		consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
 		order:                      order,
@@ -131,6 +137,22 @@
 	return pq.dropped.Load()
 }
 
+func (pq *processQueue) UpdateLastConsumeTime() {
+	pq.lastConsumeTime.Store(time.Now())
+}
+
+func (pq *processQueue) LastConsumeTime() time.Time {
+	return pq.lastConsumeTime.Load().(time.Time)
+}
+
+func (pq *processQueue) UpdateLastLockTime() {
+	pq.lastLockTime.Store(time.Now())
+}
+
+func (pq *processQueue) LastLockTime() time.Time {
+	return pq.lastLockTime.Load().(time.Time)
+}
+
 func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
 	pq.mutex.Lock()
 	for _, msg := range messages {
@@ -144,7 +166,7 @@
 func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
 	result := int64(-1)
 	pq.mutex.Lock()
-	pq.lastConsumeTime = time.Now()
+	pq.UpdateLastConsumeTime()
 	if !pq.msgCache.Empty() {
 		result = pq.queueOffsetMax + 1
 		removedCount := 0
@@ -169,7 +191,7 @@
 }
 
 func (pq *processQueue) isLockExpired() bool {
-	return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
+	return time.Now().Sub(pq.LastLockTime()) > _RebalanceLockMaxTime
 }
 
 func (pq *processQueue) isPullExpired() bool {
@@ -332,10 +354,10 @@
 	info := internal.ProcessQueueInfo{
 		Locked:               pq.locked.Load(),
 		TryUnlockTimes:       pq.tryUnlockTimes,
-		LastLockTimestamp:    pq.lastLockTime.UnixNano() / 10e6,
+		LastLockTimestamp:    pq.LastLockTime().UnixNano() / int64(time.Millisecond),
 		Dropped:              pq.dropped.Load(),
-		LastPullTimestamp:    pq.lastPullTime.UnixNano() / 10e6,
-		LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
+		LastPullTimestamp:    pq.lastPullTime.UnixNano() / int64(time.Millisecond),
+		LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
 	}
 
 	if !pq.msgCache.Empty() {
diff --git a/internal/client.go b/internal/client.go
index d9c5f6a..c952385 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -164,6 +164,7 @@
 	remoteClient remote.RemotingClient
 	hbMutex      sync.Mutex
 	close        bool
+	rbMutex      sync.Mutex
 	namesrvs     *namesrvs
 	done         chan struct{}
 	shutdownOnce sync.Once
@@ -618,6 +619,8 @@
 }
 
 func (c *rmqClient) RebalanceImmediately() {
+	c.rbMutex.Lock()
+	defer c.rbMutex.Unlock()
 	c.consumerMap.Range(func(key, value interface{}) bool {
 		consumer := value.(InnerConsumer)
 		consumer.Rebalance()