feat(core): fix concurrent read/write for broker version (#333)
- fix concurrent read/write for broker version
Closes #334
diff --git a/internal/client.go b/internal/client.go
index 17c089b..b1ad3a6 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -452,15 +452,7 @@
return true
}
if response.Code == ResSuccess {
- v, exist := c.namesrvs.brokerVersionMap.Load(brokerName)
- var m map[string]int32
- if exist {
- m = v.(map[string]int32)
- } else {
- m = make(map[string]int32, 4)
- c.namesrvs.brokerVersionMap.Store(brokerName, m)
- }
- m[brokerName] = int32(response.Version)
+ c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 27d38e8..3c9cf12 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -83,8 +83,10 @@
// brokerName -> *BrokerData
brokerAddressesMap sync.Map
- // brokerName -> map[string]int32
- brokerVersionMap sync.Map
+ // brokerName -> map[string]int32: brokerAddr -> version
+ brokerVersionMap map[string]map[string]int32
+ // lock for broker version read/write
+ brokerLock *sync.RWMutex
//subscribeInfoMap sync.Map
routeDataMap sync.Map
@@ -103,9 +105,11 @@
}
nameSrvClient := remote.NewRemotingClient()
return &namesrvs{
- srvs: addr,
- lock: new(sync.Mutex),
- nameSrvClient: nameSrvClient,
+ srvs: addr,
+ lock: new(sync.Mutex),
+ nameSrvClient: nameSrvClient,
+ brokerVersionMap: make(map[string]map[string]int32, 0),
+ brokerLock: new(sync.RWMutex),
}, nil
}
diff --git a/internal/route.go b/internal/route.go
index 5851f85..5c7b114 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -296,20 +296,29 @@
return publishInfo.MqList, nil
}
-func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
+func (s *namesrvs) AddBrokerVersion(brokerName, brokerAddr string, version int32) {
+ s.brokerLock.Lock()
+ defer s.brokerLock.Unlock()
- versions, exist := s.brokerVersionMap.Load(brokerName)
+ m, exist := s.brokerVersionMap[brokerName]
+ if !exist {
+ m = make(map[string]int32, 4)
+ s.brokerVersionMap[brokerName] = m
+ }
+ m[brokerAddr] = version
+}
+
+func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
+ s.brokerLock.RLock()
+ defer s.brokerLock.RUnlock()
+
+ versions, exist := s.brokerVersionMap[brokerName]
if !exist {
return 0
}
- v, exist := versions.(map[string]int32)[brokerAddr]
-
- if exist {
- return v
- }
- return 0
+ return versions[brokerAddr]
}
func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
diff --git a/internal/route_test.go b/internal/route_test.go
index aa944b0..b24f312 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -19,6 +19,7 @@
import (
"context"
+ "sync"
"testing"
"time"
@@ -66,3 +67,19 @@
})
})
}
+
+func TestAddBrokerVersion(t *testing.T) {
+ s := &namesrvs{}
+ s.brokerVersionMap = make(map[string]map[string]int32, 0)
+ s.brokerLock = new(sync.RWMutex)
+
+ v := s.findBrokerVersion("b1", "addr1")
+ assert.Equal(t, v, int32(0))
+
+ s.AddBrokerVersion("b1", "addr1", 1)
+ v = s.findBrokerVersion("b1", "addr1")
+ assert.Equal(t, v, int32(1))
+
+ v = s.findBrokerVersion("b1", "addr2")
+ assert.Equal(t, v, int32(0))
+}