fix: lots of incorrect impl (#322)
Closes #321
diff --git a/consumer/consumer.go b/consumer/consumer.go
index fcb14d2..6ebe9ff 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -341,13 +341,7 @@
func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
_, exist := dc.subscriptionDataTable.Load(topic)
- // does subscribe, if true, replace it
if exist {
- mqSet := make(map[int]*primitive.MessageQueue, 0)
- for idx := range mqs {
- mq := mqs[idx]
- mqSet[mq.HashCode()] = mq
- }
dc.topicSubscribeInfoTable.Store(topic, mqs)
}
}
@@ -1058,11 +1052,11 @@
)
func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+ pullFromWhichNodeTable.Store(*mq, brokerId)
}
func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+ v, exist := pullFromWhichNodeTable.Load(*mq)
if exist {
return v.(int64)
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6d70393..74251a1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -764,14 +764,10 @@
// return
//}
- set := make(map[int]*primitive.MessageQueue, 0)
- for k := range table {
- set[k.HashCode()] = &k
- }
pc.processQueueTable.Range(func(key, value interface{}) bool {
- mqHash := value.(int)
+ mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
- if set[mqHash] != nil {
+ if _, ok := table[mq]; !ok {
pq.WithDropped(true)
pq.clear()
}
@@ -782,18 +778,16 @@
if !exist {
return
}
- queuesOfTopic := v.(map[int]primitive.MessageQueue)
- for k := range queuesOfTopic {
- q := set[k]
- if q != nil {
- pc.storage.update(q, table[*q], false)
+ queuesOfTopic := v.([]primitive.MessageQueue)
+ for _, k := range queuesOfTopic {
+ if _, ok := table[k]; ok {
+ pc.storage.update(&k, table[k], false)
v, exist := pc.processQueueTable.Load(k)
if !exist {
continue
}
pq := v.(*processQueue)
- pc.removeUnnecessaryMessageQueue(q, pq)
- delete(queuesOfTopic, k)
+ pc.removeUnnecessaryMessageQueue(&k, pq)
}
}
}