feat(consumer): move updatePullFromWhichNode ahead of result check (#390)
- consistent with java
Closes #389
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2179e10..cacbdd1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -268,6 +268,8 @@
prCh chan PullRequest
namesrv internal.Namesrvs
+
+ pullFromWhichNodeTable sync.Map
}
func (dc *defaultConsumer) start() error {
@@ -855,7 +857,7 @@
func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
- updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+ dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
switch result.Status {
case primitive.PullFound:
@@ -1041,24 +1043,20 @@
}
func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
- result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+ result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
if result != nil {
return result
}
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+ return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
}
-var (
- pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(*mq, brokerId)
+func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
+ dc.pullFromWhichNodeTable.Store(*mq, brokerId)
}
-func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(*mq)
+func (dc *defaultConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+ v, exist := dc.pullFromWhichNodeTable.Load(*mq)
if exist {
return v.(int64)
}