fix: lots of  incorrect impl (#322)

Closes #321
diff --git a/consumer/consumer.go b/consumer/consumer.go
index fcb14d2..6ebe9ff 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -341,13 +341,7 @@
 
 func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
 	_, exist := dc.subscriptionDataTable.Load(topic)
-	// does subscribe, if true, replace it
 	if exist {
-		mqSet := make(map[int]*primitive.MessageQueue, 0)
-		for idx := range mqs {
-			mq := mqs[idx]
-			mqSet[mq.HashCode()] = mq
-		}
 		dc.topicSubscribeInfoTable.Store(topic, mqs)
 	}
 }
@@ -1058,11 +1052,11 @@
 )
 
 func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
-	pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+	pullFromWhichNodeTable.Store(*mq, brokerId)
 }
 
 func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
-	v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+	v, exist := pullFromWhichNodeTable.Load(*mq)
 	if exist {
 		return v.(int64)
 	}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6d70393..74251a1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -764,14 +764,10 @@
 	//	return
 	//}
 
-	set := make(map[int]*primitive.MessageQueue, 0)
-	for k := range table {
-		set[k.HashCode()] = &k
-	}
 	pc.processQueueTable.Range(func(key, value interface{}) bool {
-		mqHash := value.(int)
+		mq := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
-		if set[mqHash] != nil {
+		if _, ok := table[mq]; !ok {
 			pq.WithDropped(true)
 			pq.clear()
 		}
@@ -782,18 +778,16 @@
 	if !exist {
 		return
 	}
-	queuesOfTopic := v.(map[int]primitive.MessageQueue)
-	for k := range queuesOfTopic {
-		q := set[k]
-		if q != nil {
-			pc.storage.update(q, table[*q], false)
+	queuesOfTopic := v.([]primitive.MessageQueue)
+	for _, k := range queuesOfTopic {
+		if _, ok := table[k]; ok {
+			pc.storage.update(&k, table[k], false)
 			v, exist := pc.processQueueTable.Load(k)
 			if !exist {
 				continue
 			}
 			pq := v.(*processQueue)
-			pc.removeUnnecessaryMessageQueue(q, pq)
-			delete(queuesOfTopic, k)
+			pc.removeUnnecessaryMessageQueue(&k, pq)
 		}
 	}
 }