diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2179e10..cacbdd1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -268,6 +268,8 @@
 	prCh chan PullRequest
 
 	namesrv internal.Namesrvs
+
+	pullFromWhichNodeTable sync.Map
 }
 
 func (dc *defaultConsumer) start() error {
@@ -855,7 +857,7 @@
 
 func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
 
-	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+	dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
 
 	switch result.Status {
 	case primitive.PullFound:
@@ -1041,24 +1043,20 @@
 }
 
 func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
-	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
 	if result != nil {
 		return result
 	}
 	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
 }
 
-var (
-	pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
-	pullFromWhichNodeTable.Store(*mq, brokerId)
+func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
+	dc.pullFromWhichNodeTable.Store(*mq, brokerId)
 }
 
-func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
-	v, exist := pullFromWhichNodeTable.Load(*mq)
+func (dc *defaultConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+	v, exist := dc.pullFromWhichNodeTable.Load(*mq)
 	if exist {
 		return v.(int64)
 	}
