blob: 7903116040267ee642c59dd42bf9abd186183706 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rocketmq
import (
"encoding/json"
"errors"
"fmt"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
"github.com/golang/glog"
"strings"
"sync"
"time"
)
//@see com.alibaba.rocketmq.client.impl.factory.MQClientInstance
type MqClientManager struct {
rocketMqManagerLock sync.Mutex
//ClientId string
BootTimestamp int64
clientFactory *ClientFactory
NamesrvLock sync.Mutex
HeartBeatLock sync.Mutex
//all producer and consumer use this
mqClient service.RocketMqClient
//all producer and consumer use this
//private final ClientRemotingProcessor clientRemotingProcessor;
// private final PullMessageService pullMessageService;
//private final RebalanceService rebalanceService;
// private final ConsumerStatsManager consumerStatsManager;
// private final AtomicLong storeTimesTotal = new AtomicLong(0);
ServiceState int
//should be here because need all producer consumer
pullMessageController *PullMessageController
cleanExpireMsgController *CleanExpireMsgController
rebalanceControllr *RebalanceController
//should be here because need all producer consumer
defaultProducerService *service.DefaultProducerService //for send back message
}
func MqClientManagerInit(clientConfig *config.ClientConfig) (rocketMqManager *MqClientManager) {
rocketMqManager = &MqClientManager{}
rocketMqManager.BootTimestamp = time.Now().Unix()
rocketMqManager.clientFactory = ClientFactoryInit()
rocketMqManager.mqClient = service.MqClientInit(clientConfig, rocketMqManager.InitClientRequestProcessor()) // todo todo todo
rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
return
}
//CHECK_TRANSACTION_STATE
//NOTIFY_CONSUMER_IDS_CHANGED
//RESET_CONSUMER_CLIENT_OFFSET
//GET_CONSUMER_STATUS_FROM_CLIENT
//GET_CONSUMER_RUNNING_INFO
//CONSUME_MESSAGE_DIRECTLY
func (self *MqClientManager) InitClientRequestProcessor() (clientRequestProcessor remoting.ClientRequestProcessor) {
clientRequestProcessor = func(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
switch cmd.Code {
case remoting.CHECK_TRANSACTION_STATE:
glog.V(2).Info("receive_request_code CHECK_TRANSACTION_STATE")
// todo this version don't impl this
break
case remoting.NOTIFY_CONSUMER_IDS_CHANGED:
glog.V(1).Info("receive_request_code NOTIFY_CONSUMER_IDS_CHANGED")
self.rebalanceControllr.doRebalance()
break
case remoting.RESET_CONSUMER_CLIENT_OFFSET: // struct json key supported
glog.V(2).Info("receive_request_code RESET_CONSUMER_CLIENT_OFFSET")
glog.V(2).Info("op=look cmd body", string(cmd.Body))
var resetOffsetRequestHeader = &header.ResetOffsetRequestHeader{}
if cmd.ExtFields != nil {
resetOffsetRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
glog.V(2).Info("op=look ResetOffsetRequestHeader", resetOffsetRequestHeader)
resetOffsetBody := &model.ResetOffsetBody{}
err := resetOffsetBody.Decode(cmd.Body)
if err != nil {
return
}
glog.V(2).Info("op=look resetOffsetBody xxxxx", resetOffsetBody)
self.resetConsumerOffset(resetOffsetRequestHeader.Topic, resetOffsetRequestHeader.Group, resetOffsetBody.OffsetTable)
}
break
case remoting.GET_CONSUMER_STATUS_FROM_CLIENT: // useless we can use GET_CONSUMER_RUNNING_INFO instead
glog.V(2).Info("receive_request_code GET_CONSUMER_STATUS_FROM_CLIENT")
break
case remoting.GET_CONSUMER_RUNNING_INFO:
glog.V(2).Info("receive_request_code GET_CONSUMER_RUNNING_INFO")
var getConsumerRunningInfoRequestHeader = &header.GetConsumerRunningInfoRequestHeader{}
if cmd.ExtFields != nil {
getConsumerRunningInfoRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
consumerRunningInfo := model.ConsumerRunningInfo{}
consumerRunningInfo.Properties = map[string]string{}
defaultMQPushConsumer := self.clientFactory.ConsumerTable[getConsumerRunningInfoRequestHeader.ConsumerGroup]
consumerConfigMap := structs.Map(defaultMQPushConsumer.ConsumerConfig) // todo test
for key, value := range consumerConfigMap {
consumerRunningInfo.Properties[key] = fmt.Sprintf("%v", value)
}
consumerRunningInfo.Properties["PROP_NAMESERVER_ADDR"] = strings.Join(defaultMQPushConsumer.mqClient.GetRemotingClient().GetNamesrvAddrList(), ";")
consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.GetMqTableInfo()
glog.V(2).Info("op=look consumerRunningInfo", consumerRunningInfo)
jsonByte, err := consumerRunningInfo.Encode()
glog.V(2).Info("op=enCode jsonByte", string(jsonByte))
if err != nil {
glog.Error(err)
return
}
response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
}
break
case remoting.CONSUME_MESSAGE_DIRECTLY:
glog.V(2).Info("receive_request_code CONSUME_MESSAGE_DIRECTLY")
var consumeMessageDirectlyResultRequestHeader = &header.ConsumeMessageDirectlyResultRequestHeader{}
if cmd.ExtFields != nil {
consumeMessageDirectlyResultRequestHeader.FromMap(cmd.ExtFields)
messageExt := &DecodeMessage(cmd.Body)[0]
glog.V(2).Info("op=look", messageExt)
defaultMQPushConsumer := self.clientFactory.ConsumerTable[consumeMessageDirectlyResultRequestHeader.ConsumerGroup]
consumeResult, err := defaultMQPushConsumer.consumeMessageService.ConsumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
if err != nil {
return
}
jsonByte, err := json.Marshal(consumeResult)
if err != nil {
glog.Error(err)
return
}
response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
}
default:
glog.Error("illeage requestCode ", cmd.Code)
}
return
}
return
}
func (self *MqClientManager) RegistProducer(producer *DefaultMQProducer) {
producer.producerService = service.NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, self.mqClient)
self.clientFactory.ProducerTable[producer.producerGroup] = producer
return
}
func (self *MqClientManager) resetConsumerOffset(topic, group string, offsetTable map[model.MessageQueue]int64) {
consumer := self.clientFactory.ConsumerTable[group]
if consumer == nil {
glog.Error("resetConsumerOffset beacuse consumer not online,group=", group)
return
}
consumer.resetOffset(offsetTable)
}
func (self *MqClientManager) RegistConsumer(consumer *DefaultMQPushConsumer) {
if self.defaultProducerService == nil {
self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, config.NewProducerConfig(), self.mqClient)
}
consumer.mqClient = self.mqClient
consumer.offsetStore = service.RemoteOffsetStoreInit(consumer.consumerGroup, self.mqClient)
self.clientFactory.ConsumerTable[consumer.consumerGroup] = consumer
consumer.rebalance = service.NewRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)
fmt.Println(consumer.consumeMessageService)
consumer.consumeMessageService.Init(consumer.consumerGroup, self.mqClient, consumer.offsetStore, self.defaultProducerService, consumer.ConsumerConfig)
return
}
func (self *MqClientManager) Start() {
//self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
self.StartAllScheduledTask()
}
func (self MqClientManager) ShutDown() {
}
type ClientFactory struct {
ProducerTable map[string]*DefaultMQProducer //group|RocketMQProducer
ConsumerTable map[string]*DefaultMQPushConsumer //group|Consumer
}
func ClientFactoryInit() (clientFactory *ClientFactory) {
clientFactory = &ClientFactory{}
clientFactory.ProducerTable = make(map[string]*DefaultMQProducer)
clientFactory.ConsumerTable = make(map[string]*DefaultMQPushConsumer)
return
}
//heart beat
func (self MqClientManager) SendHeartbeatToAllBrokerWithLock() error {
heartbeatData := self.prepareHeartbeatData()
if len(heartbeatData.ConsumerDataSet) == 0 {
return errors.New("send heartbeat error")
}
self.mqClient.SendHeartbeatToAllBroker(heartbeatData)
return nil
}
//routeInfo
func (self MqClientManager) UpdateTopicRouteInfoFromNameServer() {
var topicSet []string
for _, consumer := range self.clientFactory.ConsumerTable {
for key, _ := range consumer.subscription {
topicSet = append(topicSet, key)
}
}
topicSet = append(topicSet, self.mqClient.GetPublishTopicList()...)
for _, topic := range topicSet {
self.mqClient.UpdateTopicRouteInfoFromNameServer(topic)
}
}
func (self MqClientManager) prepareHeartbeatData() *model.HeartbeatData {
heartbeatData := new(model.HeartbeatData)
heartbeatData.ClientId = self.mqClient.GetClientId()
heartbeatData.ConsumerDataSet = make([]*model.ConsumerData, 0)
heartbeatData.ProducerDataSet = make([]*model.ProducerData, 0)
for group, consumer := range self.clientFactory.ConsumerTable {
consumerData := new(model.ConsumerData)
consumerData.GroupName = group
consumerData.ConsumeType = consumer.consumeType
consumerData.ConsumeFromWhere = consumer.ConsumerConfig.ConsumeFromWhere
consumerData.MessageModel = consumer.messageModel
consumerData.SubscriptionDataSet = consumer.Subscriptions()
consumerData.UnitMode = consumer.unitMode
heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData)
}
for group := range self.clientFactory.ProducerTable {
producerData := new(model.ProducerData)
producerData.GroupName = group
heartbeatData.ProducerDataSet = append(heartbeatData.ProducerDataSet, producerData)
}
return heartbeatData
}