fix(route): update route (#339)

- fix producer publish update logic
- fix consumer subcription update logic

Closes #338
diff --git a/internal/client.go b/internal/client.go
index b1ad3a6..c41417d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -148,7 +148,7 @@
 	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
 	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
 	RebalanceImmediately()
-	UpdatePublishInfo(topic string, data *TopicRouteData)
+	UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
 
 var _ RMQClient = new(rmqClient)
@@ -475,7 +475,8 @@
 		return true
 	})
 	for topic := range publishTopicSet {
-		c.UpdatePublishInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		c.UpdatePublishInfo(topic, data, changed)
 	}
 
 	subscribedTopicSet := make(map[string]bool, 0)
@@ -489,7 +490,8 @@
 	})
 
 	for topic := range subscribedTopicSet {
-		c.updateSubscribeInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		c.updateSubscribeInfo(topic, data, changed)
 	}
 }
 
@@ -639,36 +641,27 @@
 	})
 }
 
-func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
 	if data == nil {
 		return
 	}
-	if !c.isNeedUpdatePublishInfo(topic) {
-		return
-	}
-	c.producerMap.Range(func(key, value interface{}) bool {
-		p := value.(InnerProducer)
-		publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
-		publishInfo.HaveTopicRouterInfo = true
-		p.UpdateTopicPublishInfo(topic, publishInfo)
-		return true
-	})
-}
 
-func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
-	var result bool
 	c.producerMap.Range(func(key, value interface{}) bool {
 		p := value.(InnerProducer)
-		if p.IsPublishTopicNeedUpdate(topic) {
-			result = true
-			return false
+		updated := changed
+		if !updated {
+			updated = p.IsPublishTopicNeedUpdate(topic)
+		}
+		if updated {
+			publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
+			publishInfo.HaveTopicRouterInfo = true
+			p.UpdateTopicPublishInfo(topic, publishInfo)
 		}
 		return true
 	})
-	return result
 }
 
-func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData, changed bool) {
 	if data == nil {
 		return
 	}
@@ -677,8 +670,14 @@
 	}
 	c.consumerMap.Range(func(key, value interface{}) bool {
 		consumer := value.(InnerConsumer)
-		// TODO
-		consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+		updated := changed
+		if !updated {
+			updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+		}
+		if updated {
+			consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+		}
+
 		return true
 	})
 }
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 730c073..244cd0c 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -398,7 +398,7 @@
 }
 
 // UpdatePublishInfo mocks base method
-func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
 	m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
 }
 
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 365e784..0a983bd 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -87,11 +87,11 @@
 }
 
 // UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool) {
 	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
 	ret0, _ := ret[0].(*TopicRouteData)
-	return ret0
+	return ret0, changed
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c9cf12..43c5d5d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@
 
 	cleanOfflineBroker()
 
-	UpdateTopicRouteInfo(topic string) *TopicRouteData
+	UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
 
 	FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
 
diff --git a/internal/route.go b/internal/route.go
index 5c7b114..327706f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,7 +112,7 @@
 	return int(qIndex) % length
 }
 
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
 	// Todo process lock timeout
 	s.lockNamesrv.Lock()
 	defer s.lockNamesrv.Unlock()
@@ -124,7 +124,7 @@
 			rlog.Warning("query topic route from server error", map[string]interface{}{
 				rlog.LogKeyUnderlayError: err,
 			})
-			return nil
+			return nil, false
 		}
 	}
 
@@ -132,10 +132,11 @@
 		rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
 			rlog.LogKeyTopic: topic,
 		})
-		return nil
+		return nil, false
 	}
 
 	oldRouteData, exist := s.routeDataMap.Load(topic)
+
 	changed := true
 	if exist {
 		changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
@@ -153,7 +154,7 @@
 		}
 	}
 
-	return routeData.clone()
+	return routeData.clone(), changed
 }
 
 func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
diff --git a/producer/producer.go b/producer/producer.go
index 013d3aa..05bcb15 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -340,7 +340,8 @@
 
 	v, exist := p.publishInfo.Load(topic)
 	if !exist {
-		p.client.UpdatePublishInfo(topic, p.options.Namesrv.UpdateTopicRouteInfo(topic))
+		data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+		p.client.UpdatePublishInfo(topic, data, changed)
 		v, exist = p.publishInfo.Load(topic)
 	}