blob: a1a3f792cbe718d1179e00ee61617a86a626021b [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"errors"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
const (
EnvNameServerAddr = "NAMESRV_ADDR"
requestTimeout = 3 * time.Second
defaultTopic = "TBW102"
defaultQueueNums = 4
MasterId = int64(0)
)
var (
ErrTopicNotExist = errors.New("topic not exist")
)
func (s *namesrvs) cleanOfflineBroker() {
// TODO optimize
s.lockNamesrv.Lock()
s.brokerAddressesMap.Range(func(key, value interface{}) bool {
brokerName := key.(string)
bd := value.(*BrokerData)
for k, v := range bd.BrokerAddresses {
isBrokerAddrExistInTopicRoute := false
s.routeDataMap.Range(func(key, value interface{}) bool {
trd := value.(*TopicRouteData)
for idx := range trd.BrokerDataList {
for _, v1 := range trd.BrokerDataList[idx].BrokerAddresses {
if v1 == v {
isBrokerAddrExistInTopicRoute = true
return false
}
}
}
return true
})
if !isBrokerAddrExistInTopicRoute {
delete(bd.BrokerAddresses, k)
rlog.Info("the broker: [name=%s, ID=%d, addr=%s,] is offline, remove it", map[string]interface{}{
"brokerName": brokerName,
"brokerID": k,
"brokerAddr": v,
})
}
}
if len(bd.BrokerAddresses) == 0 {
s.brokerAddressesMap.Delete(brokerName)
rlog.Info("the broker name's host is offline, remove it", map[string]interface{}{
"brokerName": brokerName,
})
}
return true
})
s.lockNamesrv.Unlock()
}
// key is topic, value is TopicPublishInfo
type TopicPublishInfo struct {
OrderTopic bool
HaveTopicRouterInfo bool
MqList []*primitive.MessageQueue
RouteData *TopicRouteData
TopicQueueIndex int32
}
func (info *TopicPublishInfo) isOK() (bIsTopicOk bool) {
return len(info.MqList) > 0
}
func (info *TopicPublishInfo) fetchQueueIndex() int {
length := len(info.MqList)
if length <= 0 {
return -1
}
qIndex := atomic.AddInt32(&info.TopicQueueIndex, 1)
return int(qIndex) % length
}
func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
return s.UpdateTopicRouteInfoWithDefault(topic, "", 0)
}
func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
var (
routeData *TopicRouteData
err error
)
t := topic
if len(defaultTopic) > 0 {
t = defaultTopic
}
routeData, err = s.queryTopicRouteInfoFromServer(t)
if err != nil {
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
if routeData == nil {
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil, false, err
}
if len(defaultTopic) > 0 {
for _, q := range routeData.QueueDataList {
if q.ReadQueueNums > defaultQueueNum {
q.ReadQueueNums = defaultQueueNum
q.WriteQueueNums = defaultQueueNum
}
}
}
oldRouteData, exist := s.routeDataMap.Load(topic)
changed := true
if exist {
changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
}
if changed {
s.routeDataMap.Store(topic, routeData)
rlog.Info("the topic route info changed", map[string]interface{}{
rlog.LogKeyTopic: topic,
rlog.LogKeyValueChangedFrom: oldRouteData,
rlog.LogKeyValueChangedTo: routeData.String(),
})
for _, brokerData := range routeData.BrokerDataList {
s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
}
}
return routeData.clone(), changed, nil
}
func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
for _, brokerData := range routeData.BrokerDataList {
s.brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
}
}
func (s *namesrvs) FindBrokerAddrByTopic(topic string) string {
v, exist := s.routeDataMap.Load(topic)
if !exist {
return ""
}
routeData := v.(*TopicRouteData)
if len(routeData.BrokerDataList) == 0 {
return ""
}
i := utils.AbsInt(rand.Int())
bd := routeData.BrokerDataList[i%len(routeData.BrokerDataList)]
addr := bd.BrokerAddresses[MasterId]
if addr == "" && len(bd.BrokerAddresses) > 0 {
i = i % len(bd.BrokerAddresses)
for _, v := range bd.BrokerAddresses {
if i <= 0 {
addr = v
break
}
i--
}
}
return addr
}
func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
bd, exist := s.brokerAddressesMap.Load(brokerName)
if !exist {
return ""
}
return bd.(*BrokerData).BrokerAddresses[MasterId]
}
func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
//slave = false
//found = false
)
v, exist := s.brokerAddressesMap.Load(brokerName)
if !exist {
return nil
}
data := v.(*BrokerData)
if len(data.BrokerAddresses) == 0 {
return nil
}
brokerAddr = data.BrokerAddresses[brokerId]
//for k, v := range data.BrokerAddresses {
// if v != "" {
// found = true
// if k != MasterId {
// slave = true
// }
// brokerAddr = v
// break
// }
//}
var result *FindBrokerResult
if brokerAddr != "" {
result = &FindBrokerResult{
BrokerAddr: brokerAddr,
Slave: brokerId != 0,
BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
}
}
return result
}
func (s *namesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
routeData, err := s.queryTopicRouteInfoFromServer(topic)
if err != nil {
return nil, err
}
mqs := make([]*primitive.MessageQueue, 0)
for _, qd := range routeData.QueueDataList {
if queueIsReadable(qd.Perm) {
for i := 0; i < qd.ReadQueueNums; i++ {
mqs = append(mqs, &primitive.MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
}
}
}
return mqs, nil
}
func (s *namesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
var (
err error
routeData *TopicRouteData
)
v, exist := s.routeDataMap.Load(topic)
if !exist {
routeData, err = s.queryTopicRouteInfoFromServer(topic)
if err != nil {
rlog.Error("queryTopicRouteInfoFromServer failed", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil, err
}
s.routeDataMap.Store(topic, routeData)
s.AddBroker(routeData)
} else {
routeData = v.(*TopicRouteData)
}
if err != nil {
return nil, err
}
publishInfo := s.routeData2PublishInfo(topic, routeData)
return publishInfo.MqList, nil
}
func (s *namesrvs) AddBrokerVersion(brokerName, brokerAddr string, version int32) {
s.brokerLock.Lock()
defer s.brokerLock.Unlock()
m, exist := s.brokerVersionMap[brokerName]
if !exist {
m = make(map[string]int32, 4)
s.brokerVersionMap[brokerName] = m
}
m[brokerAddr] = version
}
func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
s.brokerLock.RLock()
defer s.brokerLock.RUnlock()
versions, exist := s.brokerVersionMap[brokerName]
if !exist {
return 0
}
return versions[brokerAddr]
}
func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
request := &GetRouteInfoRequestHeader{
Topic: topic,
}
var (
response *remote.RemotingCommand
err error
)
for i := 0; i < s.Size(); i++ {
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)
if err == nil {
break
}
}
if err != nil {
rlog.Error("connect to namesrv failed.", map[string]interface{}{
"namesrv": s,
})
return nil, primitive.NewRemotingErr(err.Error())
}
switch response.Code {
case ResSuccess:
if response.Body == nil {
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
routeData := &TopicRouteData{}
err = routeData.decode(string(response.Body))
if err != nil {
rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
return routeData, nil
case ResTopicNotExist:
return nil, ErrTopicNotExist
default:
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
}
func (s *namesrvs) topicRouteDataIsChange(oldData *TopicRouteData, newData *TopicRouteData) bool {
if oldData == nil || newData == nil {
return true
}
oldDataCloned := oldData.clone()
newDataCloned := newData.clone()
sort.Slice(oldDataCloned.QueueDataList, func(i, j int) bool {
return strings.Compare(oldDataCloned.QueueDataList[i].BrokerName, oldDataCloned.QueueDataList[j].BrokerName) > 0
})
sort.Slice(oldDataCloned.BrokerDataList, func(i, j int) bool {
return strings.Compare(oldDataCloned.BrokerDataList[i].BrokerName, oldDataCloned.BrokerDataList[j].BrokerName) > 0
})
sort.Slice(newDataCloned.QueueDataList, func(i, j int) bool {
return strings.Compare(newDataCloned.QueueDataList[i].BrokerName, newDataCloned.QueueDataList[j].BrokerName) > 0
})
sort.Slice(newDataCloned.BrokerDataList, func(i, j int) bool {
return strings.Compare(newDataCloned.BrokerDataList[i].BrokerName, newDataCloned.BrokerDataList[j].BrokerName) > 0
})
return !oldDataCloned.equals(newDataCloned)
}
func (s *namesrvs) routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo {
publishInfo := &TopicPublishInfo{
RouteData: data,
OrderTopic: false,
}
if data.OrderTopicConf != "" {
brokers := strings.Split(data.OrderTopicConf, ";")
for _, broker := range brokers {
item := strings.Split(broker, ":")
nums, _ := strconv.Atoi(item[1])
for i := 0; i < nums; i++ {
mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: item[0],
QueueId: i,
}
publishInfo.MqList = append(publishInfo.MqList, mq)
}
}
publishInfo.OrderTopic = true
return publishInfo
}
qds := data.QueueDataList
sort.Slice(qds, func(i, j int) bool {
return i-j >= 0
})
for _, qd := range qds {
if !queueIsWriteable(qd.Perm) {
continue
}
var bData *BrokerData
for _, bd := range data.BrokerDataList {
if bd.BrokerName == qd.BrokerName {
bData = bd
break
}
}
if bData == nil || bData.BrokerAddresses[MasterId] == "" {
continue
}
for i := 0; i < qd.WriteQueueNums; i++ {
mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: qd.BrokerName,
QueueId: i,
}
publishInfo.MqList = append(publishInfo.MqList, mq)
}
}
return publishInfo
}
// TopicRouteData TopicRouteData
type TopicRouteData struct {
OrderTopicConf string
QueueDataList []*QueueData `json:"queueDatas"`
BrokerDataList []*BrokerData `json:"brokerDatas"`
}
func (routeData *TopicRouteData) decode(data string) error {
res := gjson.Parse(data)
err := jsoniter.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)
if err != nil {
return err
}
bds := res.Get("brokerDatas").Array()
routeData.BrokerDataList = make([]*BrokerData, len(bds))
for idx, v := range bds {
bd := &BrokerData{
BrokerName: v.Get("brokerName").String(),
Cluster: v.Get("cluster").String(),
BrokerAddresses: make(map[int64]string, 0),
}
addrs := v.Get("brokerAddrs").String()
strs := strings.Split(addrs[1:len(addrs)-1], ",")
if strs != nil {
for _, str := range strs {
i := strings.Index(str, ":")
if i < 0 {
continue
}
id, _ := strconv.ParseInt(str[0:i], 10, 64)
bd.BrokerAddresses[id] = strings.Replace(str[i+1:], "\"", "", -1)
}
}
routeData.BrokerDataList[idx] = bd
}
return nil
}
func (routeData *TopicRouteData) clone() *TopicRouteData {
cloned := &TopicRouteData{
OrderTopicConf: routeData.OrderTopicConf,
QueueDataList: make([]*QueueData, len(routeData.QueueDataList)),
BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)),
}
for index, value := range routeData.QueueDataList {
cloned.QueueDataList[index] = value
}
for index, value := range routeData.BrokerDataList {
cloned.BrokerDataList[index] = value
}
return cloned
}
func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
if len(routeData.BrokerDataList) != len(data.BrokerDataList) {
return false
}
if len(routeData.QueueDataList) != len(data.QueueDataList) {
return false
}
for idx := range routeData.BrokerDataList {
if !routeData.BrokerDataList[idx].Equals(data.BrokerDataList[idx]) {
return false
}
}
for idx := range routeData.QueueDataList {
if !routeData.QueueDataList[idx].Equals(data.QueueDataList[idx]) {
return false
}
}
return true
}
func (routeData *TopicRouteData) String() string {
data, _ := jsoniter.Marshal(routeData)
return string(data)
}
// QueueData QueueData
type QueueData struct {
BrokerName string `json:"brokerName"`
ReadQueueNums int `json:"readQueueNums"`
WriteQueueNums int `json:"writeQueueNums"`
Perm int `json:"perm"`
TopicSynFlag int `json:"topicSynFlag"`
}
func (q *QueueData) Equals(qd *QueueData) bool {
if q.BrokerName != qd.BrokerName {
return false
}
if q.ReadQueueNums != qd.ReadQueueNums {
return false
}
if q.WriteQueueNums != qd.WriteQueueNums {
return false
}
if q.Perm != qd.Perm {
return false
}
if q.TopicSynFlag != qd.TopicSynFlag {
return false
}
return true
}
// BrokerData BrokerData
type BrokerData struct {
Cluster string `json:"cluster"`
BrokerName string `json:"brokerName"`
BrokerAddresses map[int64]string `json:"brokerAddrs"`
brokerAddressesLock sync.RWMutex
}
func (b *BrokerData) Equals(bd *BrokerData) bool {
if b.Cluster != bd.Cluster {
return false
}
if b.BrokerName != bd.BrokerName {
return false
}
if len(b.BrokerAddresses) != len(bd.BrokerAddresses) {
return false
}
for k, v := range b.BrokerAddresses {
if bd.BrokerAddresses[k] != v {
return false
}
}
return true
}