fix the bug caused in HA cluster when master broker node is down (#714)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84ce84..e3e0d32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -99,8 +99,12 @@
 		consumeOrderly: defaultOpts.ConsumeOrderly,
 		fromWhere:      defaultOpts.FromWhere,
 		allocate:       defaultOpts.Strategy,
-		option:         defaultOpts,
 		namesrv:        srvs,
+		option:         defaultOpts,
+	}
+	dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
+	if err != nil {
+		return nil, err
 	}
 
 	p := &pushConsumer{
diff --git a/internal/client.go b/internal/client.go
index 3a09ea8..4de3b93 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -186,6 +186,9 @@
 		done:         make(chan struct{}),
 	}
 	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+	client.namesrvs = GetOrSetNamesrv(client.ClientID(), client.namesrvs)
+	client.namesrvs.bundleClient = actual.(*rmqClient)
+	client.option.Namesrv = client.namesrvs
 	if !loaded {
 		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
 			rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
diff --git a/internal/namesrv.go b/internal/namesrv.go
index a47bbc1..7776651 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -19,6 +19,7 @@
 
 import (
 	"errors"
+	"fmt"
 	"regexp"
 	"strings"
 	"sync"
@@ -76,6 +77,8 @@
 	// brokerName -> *BrokerData
 	brokerAddressesMap sync.Map
 
+	bundleClient *rmqClient
+
 	// brokerName -> map[string]int32: brokerAddr -> version
 	brokerVersionMap map[string]map[string]int32
 	// lock for broker version read/write
@@ -92,9 +95,21 @@
 }
 
 var _ Namesrvs = (*namesrvs)(nil)
+var namesrvMap sync.Map
 
 // NewNamesrv init Namesrv from namesrv addr string.
 // addr primitive.NamesrvAddr
+func GetOrSetNamesrv(clientId string, namesrv *namesrvs) *namesrvs {
+	actual, _ := namesrvMap.LoadOrStore(clientId, namesrv)
+	return actual.(*namesrvs)
+}
+func GetNamesrv(clientId string) (*namesrvs, error) {
+	actual, ok := namesrvMap.Load(clientId)
+	if !ok {
+		return nil, fmt.Errorf("the namesrv in instanceName [%s] not found", clientId)
+	}
+	return actual.(*namesrvs), nil
+}
 func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
 	addr := resolver.Resolve()
 	if len(addr) == 0 {
diff --git a/internal/route.go b/internal/route.go
index 66be96d..3676cb4 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -161,6 +161,34 @@
 	}
 
 	if changed {
+		if s.bundleClient != nil {
+			s.bundleClient.producerMap.Range(func(key, value interface{}) bool {
+				p := value.(InnerProducer)
+				updated := changed
+				if !updated {
+					updated = p.IsPublishTopicNeedUpdate(topic)
+				}
+				if updated {
+					publishInfo := s.bundleClient.namesrvs.routeData2PublishInfo(topic, routeData)
+					publishInfo.HaveTopicRouterInfo = true
+					p.UpdateTopicPublishInfo(topic, publishInfo)
+				}
+				return true
+			})
+			s.bundleClient.consumerMap.Range(func(key, value interface{}) bool {
+				consumer := value.(InnerConsumer)
+				updated := changed
+				if !updated {
+					updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+				}
+				if updated {
+					consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, routeData))
+				}
+
+				return true
+			})
+		}
+
 		s.routeDataMap.Store(topic, routeData)
 		rlog.Info("the topic route info changed", map[string]interface{}{
 			rlog.LogKeyTopic:            topic,
diff --git a/producer/producer.go b/producer/producer.go
index 8ebb660..ef4ea25 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -72,7 +72,10 @@
 		options:    defaultOpts,
 	}
 	producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
-
+	producer.options.ClientOptions.Namesrv, err = internal.GetNamesrv(producer.client.ClientID())
+	if err != nil {
+		return nil, err
+	}
 	producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)
 
 	return producer, nil