blob: 9dbecc6974d85aed7887cfe5bb4ba0c5763ee4db [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernel
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/apache/rocketmq-client-go/remote"
"github.com/apache/rocketmq-client-go/rlog"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
defaultTraceRegionID = "DefaultRegion"
// tracing message switch
_TranceOff = "false"
// Pulling topic information interval from the named server
_PullNameServerInterval = 30 * time.Second
// Pulling topic information interval from the named server
_HeartbeatBrokerInterval = 30 * time.Second
// Offset persistent interval for consumer
_PersistOffset = 5 * time.Second
// Rebalance interval
_RebalanceInterval = 10 * time.Second
)
var (
ErrServiceState = errors.New("service state is not running, please check")
_VIPChannelEnable = false
)
func init() {
if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
value, err := strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
if err == nil {
_VIPChannelEnable = value
}
}
}
type ClientOption struct {
NameServerAddr string
ClientIP string
InstanceName string
UnitMode bool
UnitName string
VIPChannelEnabled bool
UseTLS bool
}
func (opt *ClientOption) ChangeInstanceNameToPID() {
if opt.InstanceName == "DEFAULT" {
opt.InstanceName = strconv.Itoa(os.Getegid())
}
}
func (opt *ClientOption) String() string {
return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.ClientIP,
opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
}
type InnerProducer interface {
PublishTopicList() []string
UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
IsPublishTopicNeedUpdate(topic string) bool
//GetTransactionListener() TransactionListener
IsUnitMode() bool
}
type InnerConsumer interface {
PersistConsumerOffset()
UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
IsSubscribeTopicNeedUpdate(topic string) bool
SubscriptionDataList() []*SubscriptionData
Rebalance()
IsUnitMode() bool
}
type RMQClient struct {
option ClientOption
// group -> InnerProducer
producerMap sync.Map
// group -> InnerConsumer
consumerMap sync.Map
once sync.Once
remoteClient *remote.RemotingClient
hbMutex sync.Mutex
}
var clientMap sync.Map
func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
client := &RMQClient{
option: option,
remoteClient: remote.NewRemotingClient(),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
if !loaded {
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand) *remote.RemotingCommand {
rlog.Infof("receive broker's notification, the consumer group: %s", req.ExtFields["consumerGroup"])
client.RebalanceImmediately()
return nil
})
}
return actual.(*RMQClient)
}
func (c *RMQClient) Start() {
c.once.Do(func() {
// TODO fetchNameServerAddr
go func() {}()
// schedule update route info
go func() {
// delay
time.Sleep(50 * time.Millisecond)
for {
c.UpdateTopicRouteInfo()
time.Sleep(_PullNameServerInterval)
}
}()
// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
go func() {
for {
cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
time.Sleep(_HeartbeatBrokerInterval)
}
}()
// schedule persist offset
go func() {
//time.Sleep(10 * time.Second)
for {
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
consumer.PersistConsumerOffset()
return true
})
time.Sleep(_PersistOffset)
}
}()
go func() {
for {
c.RebalanceImmediately()
time.Sleep(_RebalanceInterval)
}
}()
})
}
func (c *RMQClient) Shutdown() {
// TODO
}
func (c *RMQClient) ClientID() string {
id := c.option.ClientIP + "@" + c.option.InstanceName
if c.option.UnitName != "" {
id += "@" + c.option.UnitName
}
return id
}
func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
}
func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error {
return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
}
func (c *RMQClient) CheckClientInBroker() {
}
// TODO
func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
c.hbMutex.Lock()
defer c.hbMutex.Unlock()
hbData := &heartbeatData{
ClientId: c.ClientID(),
}
pData := make([]producerData, 0)
c.producerMap.Range(func(key, value interface{}) bool {
pData = append(pData, producerData(key.(string)))
return true
})
cData := make([]consumerData, 0)
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
cData = append(cData, consumerData{
GroupName: key.(string),
CType: "PUSH",
MessageModel: "CLUSTERING",
Where: "CONSUME_FROM_FIRST_OFFSET",
UnitMode: consumer.IsUnitMode(),
SubscriptionDatas: consumer.SubscriptionDataList(),
})
return true
})
hbData.ProducerDatas = pData
hbData.ConsumerDatas = cData
if len(pData) == 0 && len(cData) == 0 {
rlog.Info("sending heartbeat, but no consumer and no consumer")
return
}
brokerAddressesMap.Range(func(key, value interface{}) bool {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
response, err := c.remoteClient.InvokeSync(addr, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send heart beat to broker error: %s", err.Error())
return true
}
if response.Code == ResSuccess {
v, exist := brokerVersionMap.Load(brokerName)
var m map[string]int32
if exist {
m = v.(map[string]int32)
} else {
m = make(map[string]int32, 4)
brokerVersionMap.Store(brokerName, m)
}
m[brokerName] = int32(response.Version)
rlog.Infof("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
}
}
return true
})
}
func (c *RMQClient) UpdateTopicRouteInfo() {
publishTopicSet := make(map[string]bool, 0)
c.producerMap.Range(func(key, value interface{}) bool {
producer := value.(InnerProducer)
list := producer.PublishTopicList()
for idx := range list {
publishTopicSet[list[idx]] = true
}
return true
})
for topic := range publishTopicSet {
c.UpdatePublishInfo(topic, UpdateTopicRouteInfo(topic))
}
subscribedTopicSet := make(map[string]bool, 0)
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
list := consumer.SubscriptionDataList()
for idx := range list {
if !strings.HasPrefix(list[idx].Topic, RetryGroupTopicPrefix) {
subscribedTopicSet[list[idx].Topic] = true
}
}
return true
})
for topic := range subscribedTopicSet {
c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
}
}
// SendMessageAsync send message with batch by async
func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
msgs []*Message, f func(result *SendResult)) error {
return nil
}
func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with oneway error: %v", err)
}
return nil, err
}
func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult {
var status SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
status = SendFlushDiskTimeout
case ResFlushSlaveTimeout:
status = SendFlushSlaveTimeout
case ResSlaveNotAvailable:
status = SendSlaveNotAvailable
case ResSuccess:
status = SendOK
default:
// TODO process unknown code
}
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
msgIDs = append(msgIDs, msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
}
regionId := cmd.ExtFields[PropertyMsgRegion]
trace := cmd.ExtFields[PropertyTraceSwitch]
if regionId == "" {
regionId = defaultTraceRegionID
}
qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
return &SendResult{
Status: status,
MsgID: cmd.ExtFields["msgId"],
OffsetMsgID: cmd.ExtFields["msgId"],
MessageQueue: &MessageQueue{
Topic: msgs[0].Topic,
BrokerName: brokerName,
QueueId: qId,
},
QueueOffset: off,
//TransactionID: sendResponse.TransactionId,
RegionID: regionId,
TraceOn: trace != "" && trace != _TranceOff,
}
}
// PullMessage with sync
func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
return nil, err
}
return c.processPullResponse(res)
}
func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
pullResult := &PullResult{}
switch response.Code {
case ResSuccess:
pullResult.Status = PullFound
case ResPullNotFound:
pullResult.Status = PullNoNewMsg
case ResPullRetryImmediately:
pullResult.Status = PullNoMsgMatched
case ResPullOffsetMoved:
pullResult.Status = PullOffsetIllegal
default:
return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
}
v, exist := response.ExtFields["maxOffset"]
if exist {
pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
}
v, exist = response.ExtFields["minOffset"]
if exist {
pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
}
v, exist = response.ExtFields["nextBeginOffset"]
if exist {
pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
}
v, exist = response.ExtFields["suggestWhichBrokerId"]
if exist {
pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
}
pullResult.messageExts = decodeMessage(response.Body)
return pullResult, nil
}
// PullMessageAsync pull message async
func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error {
return nil
}
func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
c.consumerMap.Store(group, consumer)
return nil
}
func (c *RMQClient) UnregisterConsumer(group string) {
}
func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
c.producerMap.Store(group, producer)
}
func (c *RMQClient) UnregisterProducer(group string) {
}
func (c *RMQClient) SelectProducer(group string) InnerProducer {
return nil
}
func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
return nil
}
func (c *RMQClient) RebalanceImmediately() {
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
consumer.Rebalance()
return true
})
}
func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
if data == nil {
return
}
if !c.isNeedUpdatePublishInfo(topic) {
return
}
c.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
publishInfo := routeData2PublishInfo(topic, data)
publishInfo.HaveTopicRouterInfo = true
p.UpdateTopicPublishInfo(topic, publishInfo)
return true
})
}
func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
var result bool
c.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
if p.IsPublishTopicNeedUpdate(topic) {
result = true
return false
}
return true
})
return result
}
func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
if data == nil {
return
}
if !c.isNeedUpdateSubscribeInfo(topic) {
return
}
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
// TODO
consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
return true
})
}
func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
var result bool
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
if consumer.IsSubscribeTopicNeedUpdate(topic) {
result = true
return false
}
return true
})
return result
}
func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue {
list := make([]*MessageQueue, 0)
for idx := range data.QueueDataList {
qd := data.QueueDataList[idx]
if queueIsReadable(qd.Perm) {
for i := 0; i < qd.ReadQueueNums; i++ {
list = append(list, &MessageQueue{
Topic: topic,
BrokerName: qd.BrokerName,
QueueId: i,
})
}
}
}
return list
}
func encodeMessages(message []*Message) []byte {
var buffer bytes.Buffer
index := 0
for index < len(message) {
buffer.Write(message[index].Body)
}
return buffer.Bytes()
}
func brokerVIPChannel(brokerAddr string) string {
if !_VIPChannelEnable {
return brokerAddr
}
var brokerAddrNew strings.Builder
ipAndPort := strings.Split(brokerAddr, ":")
port, err := strconv.Atoi(ipAndPort[1])
if err != nil {
return ""
}
brokerAddrNew.WriteString(ipAndPort[0])
brokerAddrNew.WriteString(":")
brokerAddrNew.WriteString(strconv.Itoa(port - 2))
return brokerAddrNew.String()
}