[ISSUE #631] Support Consuming from Slave
Co-authored-by: zhangxu16 <zhangxu16@xiaomi.com>
diff --git a/internal/route.go b/internal/route.go
index 09b6e53..7b27c9d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -219,10 +219,12 @@
func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
- //slave = false
- //found = false
+ slave = false
+ found = false
)
+ rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil)
+
v, exist := s.brokerAddressesMap.Load(brokerName)
if !exist {
@@ -234,22 +236,40 @@
}
brokerAddr = data.BrokerAddresses[brokerId]
- //for k, v := range data.BrokerAddresses {
- // if v != "" {
- // found = true
- // if k != MasterId {
- // slave = true
- // }
- // brokerAddr = v
- // break
- // }
- //}
+ slave = brokerId != MasterId
+ if brokerAddr != "" {
+ found = true
+ }
+
+ // not found && read from slave, try again use next brokerId
+ if !found && slave {
+ rlog.Debug("Not found broker addr and slave "+strconv.FormatBool(slave), nil)
+ brokerAddr = data.BrokerAddresses[brokerId+1]
+ found = brokerAddr != ""
+ }
+
+ // still not found && cloud use other broker addr, find anyone in BrokerAddresses
+ if !found && !onlyThisBroker {
+ rlog.Debug("STILL Not found broker addr", nil)
+ for k, v := range data.BrokerAddresses {
+ if v != "" {
+ brokerAddr = v
+ found = true
+ slave = k != MasterId
+ break
+ }
+ }
+ }
+
+ if found {
+ rlog.Debug("Find broker addr "+brokerAddr, nil)
+ }
var result *FindBrokerResult
- if brokerAddr != "" {
+ if found {
result = &FindBrokerResult{
BrokerAddr: brokerAddr,
- Slave: brokerId != 0,
+ Slave: slave,
BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
}
}
diff --git a/internal/route_test.go b/internal/route_test.go
index c9b65f0..ded7780 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -82,3 +82,56 @@
v = s.findBrokerVersion("b1", "addr2")
assert.Equal(t, v, int32(0))
}
+
+func TestFindBrokerAddressInSubscribe(t *testing.T) {
+ s := &namesrvs{}
+ s.brokerVersionMap = make(map[string]map[string]int32, 0)
+ s.brokerLock = new(sync.RWMutex)
+
+ brokerDataRaft1 := &BrokerData{
+ Cluster: "cluster",
+ BrokerName: "raft01",
+ BrokerAddresses: map[int64]string{
+ 0: "127.0.0.1:10911",
+ 1: "127.0.0.1:10912",
+ 2: "127.0.0.1:10913",
+ },
+ }
+ s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1)
+ brokerDataRaft2 := &BrokerData{
+ Cluster: "cluster",
+ BrokerName: "raft02",
+ BrokerAddresses: map[int64]string{
+ 0: "127.0.0.1:10911",
+ 2: "127.0.0.1:10912",
+ 3: "127.0.0.1:10913",
+ },
+ }
+ s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2)
+
+ Convey("Request master broker", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[0])
+ assert.Equal(t, result.Slave, false)
+ })
+
+ Convey("Request slave broker from normal broker group", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[1])
+ assert.Equal(t, result.Slave, true)
+ })
+
+ Convey("Request slave broker from non normal broker group", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft2.BrokerAddresses[2])
+ assert.Equal(t, result.Slave, true)
+ })
+
+ Convey("Request not exist broker", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false)
+ assert.NotNil(t, result)
+ })
+}