feat(core): fix concurrent read/write for broker version (#333)

- fix concurrent read/write for broker version 
Closes #334
diff --git a/internal/client.go b/internal/client.go
index 17c089b..b1ad3a6 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -452,15 +452,7 @@
 				return true
 			}
 			if response.Code == ResSuccess {
-				v, exist := c.namesrvs.brokerVersionMap.Load(brokerName)
-				var m map[string]int32
-				if exist {
-					m = v.(map[string]int32)
-				} else {
-					m = make(map[string]int32, 4)
-					c.namesrvs.brokerVersionMap.Store(brokerName, m)
-				}
-				m[brokerName] = int32(response.Version)
+				c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
 				rlog.Debug("send heart beat to broker success", map[string]interface{}{
 					"brokerName": brokerName,
 					"brokerId":   id,
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 27d38e8..3c9cf12 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -83,8 +83,10 @@
 	// brokerName -> *BrokerData
 	brokerAddressesMap sync.Map
 
-	// brokerName -> map[string]int32
-	brokerVersionMap sync.Map
+	// brokerName -> map[string]int32: brokerAddr -> version
+	brokerVersionMap map[string]map[string]int32
+	// lock for broker version read/write
+	brokerLock *sync.RWMutex
 
 	//subscribeInfoMap sync.Map
 	routeDataMap sync.Map
@@ -103,9 +105,11 @@
 	}
 	nameSrvClient := remote.NewRemotingClient()
 	return &namesrvs{
-		srvs:          addr,
-		lock:          new(sync.Mutex),
-		nameSrvClient: nameSrvClient,
+		srvs:             addr,
+		lock:             new(sync.Mutex),
+		nameSrvClient:    nameSrvClient,
+		brokerVersionMap: make(map[string]map[string]int32, 0),
+		brokerLock:       new(sync.RWMutex),
 	}, nil
 }
 
diff --git a/internal/route.go b/internal/route.go
index 5851f85..5c7b114 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -296,20 +296,29 @@
 	return publishInfo.MqList, nil
 }
 
-func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
+func (s *namesrvs) AddBrokerVersion(brokerName, brokerAddr string, version int32) {
+	s.brokerLock.Lock()
+	defer s.brokerLock.Unlock()
 
-	versions, exist := s.brokerVersionMap.Load(brokerName)
+	m, exist := s.brokerVersionMap[brokerName]
+	if !exist {
+		m = make(map[string]int32, 4)
+		s.brokerVersionMap[brokerName] = m
+	}
+	m[brokerAddr] = version
+}
+
+func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
+	s.brokerLock.RLock()
+	defer s.brokerLock.RUnlock()
+
+	versions, exist := s.brokerVersionMap[brokerName]
 
 	if !exist {
 		return 0
 	}
 
-	v, exist := versions.(map[string]int32)[brokerAddr]
-
-	if exist {
-		return v
-	}
-	return 0
+	return versions[brokerAddr]
 }
 
 func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
diff --git a/internal/route_test.go b/internal/route_test.go
index aa944b0..b24f312 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"sync"
 	"testing"
 	"time"
 
@@ -66,3 +67,19 @@
 		})
 	})
 }
+
+func TestAddBrokerVersion(t *testing.T) {
+	s := &namesrvs{}
+	s.brokerVersionMap = make(map[string]map[string]int32, 0)
+	s.brokerLock = new(sync.RWMutex)
+
+	v := s.findBrokerVersion("b1", "addr1")
+	assert.Equal(t, v, int32(0))
+
+	s.AddBrokerVersion("b1", "addr1", 1)
+	v = s.findBrokerVersion("b1", "addr1")
+	assert.Equal(t, v, int32(1))
+
+	v = s.findBrokerVersion("b1", "addr2")
+	assert.Equal(t, v, int32(0))
+}