feat(consumer): advance code (#309)

- use atomic.Bool instead of bool
- be consistent with java
- use map[primitive.MessageQueue]bool instead of map[int]bool, 0
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 6d1bd53..13cef4e 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -297,7 +297,7 @@
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
 		k := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
-		pq.dropped = true
+		pq.WithDropped(true)
 		mqs = append(mqs, &k)
 		return true
 	})
@@ -465,7 +465,7 @@
 		v, exist := dc.processQueueTable.Load(_mq)
 		if exist {
 			pq := v.(*processQueue)
-			pq.locked = true
+			pq.WithLock(true)
 			pq.lastConsumeTime = time.Now()
 			pq.lastLockTime = time.Now()
 		}
@@ -522,24 +522,24 @@
 			MQs:           mqs,
 		}
 		lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
-		set := make(map[int]bool, 0)
+		set := make(map[primitive.MessageQueue]bool)
 		for idx := range lockedMQ {
 			_mq := lockedMQ[idx]
 			v, exist := dc.processQueueTable.Load(_mq)
 			if exist {
 				pq := v.(*processQueue)
-				pq.locked = true
+				pq.WithLock(true)
 				pq.lastConsumeTime = time.Now()
 			}
-			set[_mq.HashCode()] = true
+			set[_mq] = true
 		}
 		for idx := range mqs {
 			_mq := mqs[idx]
-			if !set[_mq.HashCode()] {
+			if !set[*_mq] {
 				v, exist := dc.processQueueTable.Load(_mq)
 				if exist {
 					pq := v.(*processQueue)
-					pq.locked = true
+					pq.WithLock(false)
 					pq.lastLockTime = time.Now()
 					rlog.Info("lock MessageQueue", map[string]interface{}{
 						"lockOK":                 false,
@@ -572,12 +572,12 @@
 			_mq := mqs[idx]
 			v, exist := dc.processQueueTable.Load(_mq)
 			if exist {
-				v.(*processQueue).locked = false
 				rlog.Info("lock MessageQueue", map[string]interface{}{
 					"lockOK":                 false,
 					rlog.LogKeyConsumerGroup: dc.consumerGroup,
 					rlog.LogKeyMessageQueue:  _mq.String(),
 				})
+				v.(*processQueue).WithLock(false)
 			}
 		}
 	}
@@ -597,6 +597,9 @@
 	lockOKMQSet := struct {
 		MQs []primitive.MessageQueue `json:"lockOKMQSet"`
 	}{}
+	if len(response.Body) == 0 {
+		return nil
+	}
 	err = json.Unmarshal(response.Body, &lockOKMQSet)
 	if err != nil {
 		rlog.Error("Unmarshal lock mq body error", map[string]interface{}{
@@ -660,9 +663,8 @@
 		pq := value.(*processQueue)
 		if mq.Topic == topic {
 			if !mqSet[mq] {
-				pq.dropped = true
+				pq.WithDropped(true)
 				if dc.removeUnnecessaryMessageQueue(&mq, pq) {
-					//delete(mqSet, mq)
 					dc.processQueueTable.Delete(key)
 					changed = true
 					rlog.Debug("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{
@@ -671,9 +673,9 @@
 					})
 				}
 			} else if pq.isPullExpired() && dc.cType == _PushConsume {
-				pq.dropped = true
+				pq.WithDropped(true)
 				if dc.removeUnnecessaryMessageQueue(&mq, pq) {
-					delete(mqSet, mq)
+					dc.processQueueTable.Delete(key)
 					changed = true
 					rlog.Debug("remove unnecessary mq because pull was paused, prepare to fix it", map[string]interface{}{
 						rlog.LogKeyConsumerGroup: dc.consumerGroup,
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index eeabbb2..13b2d25 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -23,11 +23,13 @@
 	"sync/atomic"
 	"time"
 
-	"github.com/apache/rocketmq-client-go/primitive"
-	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/emirpasic/gods/maps/treemap"
 	"github.com/emirpasic/gods/utils"
 	gods_util "github.com/emirpasic/gods/utils"
+	uatomic "go.uber.org/atomic"
+
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/rlog"
 )
 
 const (
@@ -45,10 +47,10 @@
 	consumingMsgOrderlyTreeMap *treemap.Map
 	tryUnlockTimes             int64
 	queueOffsetMax             int64
-	dropped                    bool
+	dropped                    *uatomic.Bool
 	lastPullTime               time.Time
 	lastConsumeTime            time.Time
-	locked                     bool
+	locked                     *uatomic.Bool
 	lastLockTime               time.Time
 	consuming                  bool
 	msgAccCnt                  int64
@@ -68,6 +70,8 @@
 		msgCh:                      make(chan []*primitive.MessageExt, 32),
 		consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
 		order:                      order,
+		locked:                     uatomic.NewBool(false),
+		dropped:                    uatomic.NewBool(false),
 	}
 	return pq
 }
@@ -110,6 +114,22 @@
 	}
 }
 
+func (pq *processQueue) WithLock(lock bool) {
+	pq.locked.Store(lock)
+}
+
+func (pq *processQueue) IsLock() bool {
+	return pq.locked.Load()
+}
+
+func (pq *processQueue) WithDropped(dropped bool) {
+	pq.dropped.Store(dropped)
+}
+
+func (pq *processQueue) IsDroppd() bool {
+	return pq.dropped.Load()
+}
+
 func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
 	pq.mutex.Lock()
 	for _, msg := range messages {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index f1aa9a8..aa999c5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -406,7 +406,7 @@
 		default:
 		}
 
-		if pq.dropped {
+		if pq.IsDroppd() {
 			rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
 				rlog.LogKeyPullRequest: request.String(),
 			})
@@ -486,7 +486,7 @@
 				goto NEXT
 			}
 		} else {
-			if pq.locked {
+			if pq.IsLock() {
 				if !request.lockedFirst {
 					offset := pc.computePullFromWhere(request.mq)
 					brokerBusy := offset < request.nextOffset
@@ -624,7 +624,7 @@
 				"result":               result.String(),
 			})
 			request.nextOffset = result.NextBeginOffset
-			pq.dropped = true
+			pq.WithDropped(true)
 			go func() {
 				time.Sleep(10 * time.Second)
 				pc.storage.update(request.mq, request.nextOffset, false)
@@ -716,7 +716,7 @@
 		mqHash := value.(int)
 		pq := value.(*processQueue)
 		if set[mqHash] != nil {
-			pq.dropped = true
+			pq.WithDropped(true)
 			pq.clear()
 		}
 		return true
@@ -804,7 +804,7 @@
 		}
 		go func() {
 		RETRY:
-			if pq.dropped {
+			if pq.IsDroppd() {
 				rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
 					rlog.LogKeyMessageQueue:  mq.String(),
 					rlog.LogKeyConsumerGroup: pc.consumerGroup,
@@ -845,7 +845,7 @@
 
 			increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
 
-			if !pq.dropped {
+			if !pq.IsDroppd() {
 				msgBackFailed := make([]*primitive.MessageExt, 0)
 				if result == ConsumeSuccess {
 					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
@@ -870,7 +870,7 @@
 
 				offset := pq.removeMessage(subMsgs...)
 
-				if offset >= 0 && !pq.dropped {
+				if offset >= 0 && !pq.IsDroppd() {
 					pc.storage.update(mq, int64(offset), true)
 				}
 				if len(msgBackFailed) > 0 {
@@ -889,7 +889,7 @@
 }
 
 func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
-	if pq.dropped {
+	if pq.IsDroppd() {
 		rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
 			rlog.LogKeyMessageQueue: mq.String(),
 		})
@@ -899,19 +899,19 @@
 	lock := pc.queueLock.fetchLock(*mq)
 	lock.Lock()
 	defer lock.Unlock()
-	if pc.model == BroadCasting || (pq.locked && !pq.isLockExpired()) {
+	if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
 		beginTime := time.Now()
 
 		continueConsume := true
 		for continueConsume {
-			if pq.dropped {
+			if pq.IsDroppd() {
 				rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
 					rlog.LogKeyMessageQueue: mq.String(),
 				})
 				break
 			}
 			if pc.model == Clustering {
-				if !pq.locked {
+				if !pq.IsLock() {
 					rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
 						rlog.LogKeyMessageQueue: mq.String(),
 					})
@@ -1018,12 +1018,12 @@
 				default:
 				}
 			}
-			if commitOffset > 0 && !pq.dropped {
+			if commitOffset > 0 && !pq.IsDroppd() {
 				_ = pc.updateOffset(mq, commitOffset)
 			}
 		}
 	} else {
-		if pq.dropped {
+		if pq.IsDroppd() {
 			rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
 				rlog.LogKeyMessageQueue: mq.String(),
 			})