fix(route): update route (#339)
- fix producer publish update logic
- fix consumer subcription update logic
Closes #338
diff --git a/internal/client.go b/internal/client.go
index b1ad3a6..c41417d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -148,7 +148,7 @@
PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
RebalanceImmediately()
- UpdatePublishInfo(topic string, data *TopicRouteData)
+ UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
}
var _ RMQClient = new(rmqClient)
@@ -475,7 +475,8 @@
return true
})
for topic := range publishTopicSet {
- c.UpdatePublishInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+ data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ c.UpdatePublishInfo(topic, data, changed)
}
subscribedTopicSet := make(map[string]bool, 0)
@@ -489,7 +490,8 @@
})
for topic := range subscribedTopicSet {
- c.updateSubscribeInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+ data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ c.updateSubscribeInfo(topic, data, changed)
}
}
@@ -639,36 +641,27 @@
})
}
-func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
if data == nil {
return
}
- if !c.isNeedUpdatePublishInfo(topic) {
- return
- }
- c.producerMap.Range(func(key, value interface{}) bool {
- p := value.(InnerProducer)
- publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
- publishInfo.HaveTopicRouterInfo = true
- p.UpdateTopicPublishInfo(topic, publishInfo)
- return true
- })
-}
-func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
- var result bool
c.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
- if p.IsPublishTopicNeedUpdate(topic) {
- result = true
- return false
+ updated := changed
+ if !updated {
+ updated = p.IsPublishTopicNeedUpdate(topic)
+ }
+ if updated {
+ publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
+ publishInfo.HaveTopicRouterInfo = true
+ p.UpdateTopicPublishInfo(topic, publishInfo)
}
return true
})
- return result
}
-func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData, changed bool) {
if data == nil {
return
}
@@ -677,8 +670,14 @@
}
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
- // TODO
- consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+ updated := changed
+ if !updated {
+ updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+ }
+ if updated {
+ consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+ }
+
return true
})
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 730c073..244cd0c 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -398,7 +398,7 @@
}
// UpdatePublishInfo mocks base method
-func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
}
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 365e784..0a983bd 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -87,11 +87,11 @@
}
// UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
ret0, _ := ret[0].(*TopicRouteData)
- return ret0
+ return ret0, changed
}
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c9cf12..43c5d5d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@
cleanOfflineBroker()
- UpdateTopicRouteInfo(topic string) *TopicRouteData
+ UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
diff --git a/internal/route.go b/internal/route.go
index 5c7b114..327706f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,7 +112,7 @@
return int(qIndex) % length
}
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
// Todo process lock timeout
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
@@ -124,7 +124,7 @@
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- return nil
+ return nil, false
}
}
@@ -132,10 +132,11 @@
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
- return nil
+ return nil, false
}
oldRouteData, exist := s.routeDataMap.Load(topic)
+
changed := true
if exist {
changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
@@ -153,7 +154,7 @@
}
}
- return routeData.clone()
+ return routeData.clone(), changed
}
func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
diff --git a/producer/producer.go b/producer/producer.go
index 013d3aa..05bcb15 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -340,7 +340,8 @@
v, exist := p.publishInfo.Load(topic)
if !exist {
- p.client.UpdatePublishInfo(topic, p.options.Namesrv.UpdateTopicRouteInfo(topic))
+ data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+ p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}