blob: f5cd9ef65c26dc872a1237528d2d285d6b101d04 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernel
import (
"encoding/json"
"errors"
"fmt"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
"strings"
"time"
)
//MqClientManager MqClientManager
type MqClientManager struct {
//rocketMqManagerLock sync.Mutex
BootTimestamp int64
clientFactory *clientFactory
mqClient RocketMqClient
pullMessageController *PullMessageController
cleanExpireMsgController *cleanExpireMsgController
rebalanceControllr *rebalanceController
defaultProducerService *DefaultProducerService
}
//MqClientManagerInit create a MqClientManager instance
func MqClientManagerInit(clientConfig *rocketmqm.MqClientConfig) (rocketMqManager *MqClientManager) {
rocketMqManager = &MqClientManager{}
rocketMqManager.BootTimestamp = time.Now().Unix()
rocketMqManager.clientFactory = clientFactoryInit()
rocketMqManager.mqClient = mqClientInit(clientConfig, rocketMqManager.initClientRequestProcessor()) // todo todo todo
rocketMqManager.pullMessageController = newPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.cleanExpireMsgController = newCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.rebalanceControllr = newRebalanceController(rocketMqManager.clientFactory)
return
}
//Start start MqClientManager
func (m *MqClientManager) Start() {
//d.sendHeartbeatToAllBrokerWithLock()//we should send heartbeat first todo check
m.startAllScheduledTask()
}
//RegisterProducer register producer to this MqClientManager
func (m *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
producer.producerService = newDefaultProducerService(producer.producerGroup, producer.ProducerConfig, m.mqClient)
m.clientFactory.producerTable[producer.producerGroup] = producer
return
}
//RegisterConsumer register consumer to this MqClientManager
func (m *MqClientManager) RegisterConsumer(consumer *DefaultMQPushConsumer) {
if m.defaultProducerService == nil {
m.defaultProducerService = newDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, rocketmqm.NewProducerConfig(), m.mqClient)
}
consumer.mqClient = m.mqClient
consumer.offsetStore = remoteOffsetStoreInit(consumer.consumerGroup, m.mqClient)
m.clientFactory.consumerTable[consumer.consumerGroup] = consumer
consumer.rebalance = newRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)
consumer.consumeMessageService.init(consumer.consumerGroup, m.mqClient, consumer.offsetStore, m.defaultProducerService, consumer.ConsumerConfig)
return
}
func (m *MqClientManager) initClientRequestProcessor() (clientRequestProcessor remoting.ClientRequestProcessor) {
clientRequestProcessor = func(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
switch cmd.Code {
case remoting.CHECK_TRANSACTION_STATE:
glog.V(2).Info("receive_request_code CHECK_TRANSACTION_STATE")
break
case remoting.NOTIFY_CONSUMER_IDS_CHANGED:
glog.V(2).Info("receive_request_code NOTIFY_CONSUMER_IDS_CHANGED")
m.rebalanceControllr.doRebalance()
break
case remoting.RESET_CONSUMER_CLIENT_OFFSET: // struct json key supported
m.resetConsumerClientOffset(cmd)
break
case remoting.GET_CONSUMER_STATUS_FROM_CLIENT: // useless we can use GET_CONSUMER_RUNNING_INFO instead
glog.V(2).Info("receive_request_code GET_CONSUMER_STATUS_FROM_CLIENT")
break
case remoting.GET_CONSUMER_RUNNING_INFO:
response = m.getConsumerRunningInfo(cmd)
break
case remoting.CONSUME_MESSAGE_DIRECTLY:
response = m.consumeMessageDirectly(cmd)
break
default:
glog.Error("illeage requestCode ", cmd.Code)
}
return
}
return
}
func (m *MqClientManager) consumeMessageDirectly(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
glog.V(2).Info("receive_request_code CONSUME_MESSAGE_DIRECTLY")
var consumeMessageDirectlyResultRequestHeader = &header.ConsumeMessageDirectlyResultRequestHeader{}
if cmd.ExtFields != nil {
consumeMessageDirectlyResultRequestHeader.FromMap(cmd.ExtFields)
messageExt := &decodeMessage(cmd.Body)[0]
glog.V(2).Info("op=look", messageExt)
defaultMQPushConsumer := m.clientFactory.consumerTable[consumeMessageDirectlyResultRequestHeader.ConsumerGroup]
consumeResult, err := defaultMQPushConsumer.consumeMessageService.consumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
if err != nil {
return
}
jsonByte, err := json.Marshal(consumeResult)
if err != nil {
glog.Error(err)
return
}
response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
}
return
}
func (m *MqClientManager) getConsumerRunningInfo(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
glog.V(2).Info("receive_request_code GET_CONSUMER_RUNNING_INFO")
var getConsumerRunningInfoRequestHeader = &header.GetConsumerRunningInfoRequestHeader{}
if cmd.ExtFields != nil {
getConsumerRunningInfoRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
consumerRunningInfo := model.ConsumerRunningInfo{}
consumerRunningInfo.Properties = map[string]string{}
defaultMQPushConsumer := m.clientFactory.consumerTable[getConsumerRunningInfoRequestHeader.ConsumerGroup]
consumerConfigMap := util.Struct2Map(defaultMQPushConsumer.ConsumerConfig)
for key, value := range consumerConfigMap {
consumerRunningInfo.Properties[key] = fmt.Sprintf("%v", value)
}
consumerRunningInfo.Properties["PROP_NAMESERVER_ADDR"] = strings.Join(defaultMQPushConsumer.mqClient.getRemotingClient().GetNamesrvAddrList(), ";")
consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.getMqTableInfo()
glog.V(2).Info("op=look consumerRunningInfo", consumerRunningInfo)
jsonByte, err := consumerRunningInfo.Encode()
glog.V(2).Info("op=enCode jsonByte", string(jsonByte))
if err != nil {
glog.Error(err)
return
}
response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
}
return
}
func (m *MqClientManager) resetConsumerClientOffset(cmd *remoting.RemotingCommand) {
glog.V(2).Info("receive_request_code RESET_CONSUMER_CLIENT_OFFSET")
glog.V(2).Info("op=look cmd body", string(cmd.Body))
var resetOffsetRequestHeader = &header.ResetOffsetRequestHeader{}
if cmd.ExtFields != nil {
resetOffsetRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
glog.V(2).Info("op=look ResetOffsetRequestHeader", resetOffsetRequestHeader)
resetOffsetBody := &model.ResetOffsetBody{}
err := resetOffsetBody.Decode(cmd.Body)
if err != nil {
return
}
glog.V(2).Info("op=look resetOffsetBody xxxxx", resetOffsetBody)
m.resetConsumerOffset(resetOffsetRequestHeader.Topic, resetOffsetRequestHeader.Group, resetOffsetBody.OffsetTable)
}
}
func (m *MqClientManager) resetConsumerOffset(topic, group string, offsetTable map[rocketmqm.MessageQueue]int64) {
consumer := m.clientFactory.consumerTable[group]
if consumer == nil {
glog.Error("resetConsumerOffset because consumer not online,group=", group)
return
}
consumer.resetOffset(offsetTable)
}
type clientFactory struct {
producerTable map[string]*DefaultMQProducer //group|RocketMQProducer
consumerTable map[string]*DefaultMQPushConsumer //group|Consumer
}
func clientFactoryInit() (clientFactoryInstance *clientFactory) {
clientFactoryInstance = &clientFactory{}
clientFactoryInstance.producerTable = make(map[string]*DefaultMQProducer)
clientFactoryInstance.consumerTable = make(map[string]*DefaultMQPushConsumer)
return
}
//heart beat
func (m *MqClientManager) sendHeartbeatToAllBrokerWithLock() error {
heartbeatData := m.prepareHeartbeatData()
if len(heartbeatData.ConsumerDataSet) == 0 {
return errors.New("send heartbeat error")
}
m.mqClient.sendHeartbeatToAllBroker(heartbeatData)
return nil
}
//routeInfo
func (m *MqClientManager) updateTopicRouteInfoFromNameServer() {
var topicSet []string
for _, consumer := range m.clientFactory.consumerTable {
for key := range consumer.subscription {
topicSet = append(topicSet, key)
}
}
topicSet = append(topicSet, m.mqClient.getPublishTopicList()...)
for _, topic := range topicSet {
m.mqClient.updateTopicRouteInfoFromNameServer(topic)
}
}
func (m *MqClientManager) prepareHeartbeatData() *model.HeartbeatData {
heartbeatData := new(model.HeartbeatData)
heartbeatData.ClientId = m.mqClient.getClientId()
heartbeatData.ConsumerDataSet = make([]*model.ConsumerData, 0)
heartbeatData.ProducerDataSet = make([]*model.ProducerData, 0)
for group, consumer := range m.clientFactory.consumerTable {
consumerData := new(model.ConsumerData)
consumerData.GroupName = group
consumerData.ConsumeType = consumer.consumeType
consumerData.ConsumeFromWhere = consumer.ConsumerConfig.ConsumeFromWhere
consumerData.MessageModel = consumer.messageModel
consumerData.SubscriptionDataSet = consumer.Subscriptions()
consumerData.UnitMode = consumer.unitMode
heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData)
}
for group := range m.clientFactory.producerTable {
producerData := new(model.ProducerData)
producerData.GroupName = group
heartbeatData.ProducerDataSet = append(heartbeatData.ProducerDataSet, producerData)
}
return heartbeatData
}