blob: 8ad91f05d45b3816b74afc6ade63ca2f0feca52c [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernel
import (
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
)
//ProducerService producerService, for send message
type ProducerService interface {
checkConfig() (err error)
sendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *rocketmqm.SendResult, err error)
}
//DefaultProducerService ProducerService's implement
type DefaultProducerService struct {
producerGroup string
producerConfig *rocketmqm.MqProducerConfig
mqClient RocketMqClient
mqFaultStrategy mqFaultStrategy
}
func newDefaultProducerService(producerGroup string, producerConfig *rocketmqm.MqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
defaultProducerService = &DefaultProducerService{
mqClient: mqClient,
producerGroup: producerGroup,
producerConfig: producerConfig,
}
defaultProducerService.checkConfig()
return
}
func (d *DefaultProducerService) checkConfig() (err error) {
// todo check if not pass panic
return
}
func (d *DefaultProducerService) sendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *rocketmqm.SendResult, err error) {
var (
topicPublishInfo *model.TopicPublishInfo
)
err = d.checkMessage(message)
if err != nil {
return
}
topicPublishInfo, err = d.mqClient.tryToFindTopicPublishInfo(message.Topic())
if err != nil {
return
}
if topicPublishInfo.JudgeTopicPublishInfoOk() == false {
err = errors.New("topicPublishInfo is error,topic=" + message.Topic())
return
}
glog.V(2).Info("op=look topicPublishInfo", topicPublishInfo)
//if(!ok) return error
sendResult, err = d.sendMsgUseTopicPublishInfo(message, communicationMode, sendCallback, topicPublishInfo, timeout)
return
}
func (d *DefaultProducerService) producerSendMessageRequest(brokerName, brokerAddr string, sendMessageHeader remoting.CustomerHeader, message *message.MessageImpl, timeout int64) (sendResult *rocketmqm.SendResult, err error) {
remotingCommand := remoting.NewRemotingCommandWithBody(remoting.SEND_MESSAGE, sendMessageHeader, message.Body())
var response *remoting.RemotingCommand
response, err = d.mqClient.getRemotingClient().InvokeSync(brokerAddr, remotingCommand, timeout)
if err != nil {
glog.Error(err)
return
}
sendResult, err = processSendResponse(brokerName, message, response)
return
}
func processSendResponse(brokerName string, message *message.MessageImpl, response *remoting.RemotingCommand) (sendResult *rocketmqm.SendResult, err error) {
sendResult = &rocketmqm.SendResult{}
switch response.Code {
case remoting.FLUSH_DISK_TIMEOUT:
{
sendResult.SetSendStatus(rocketmqm.FlushDiskTimeout)
break
}
case remoting.FLUSH_SLAVE_TIMEOUT:
{
sendResult.SetSendStatus(rocketmqm.FlushSlaveTimeout)
break
}
case remoting.SLAVE_NOT_AVAILABLE:
{
sendResult.SetSendStatus(rocketmqm.SlaveNotAvaliable)
break
}
case remoting.SUCCESS:
{
sendResult.SetSendStatus(rocketmqm.SendOK)
break
}
default:
err = errors.New("response.Code error_code=" + util.IntToString(int(response.Code)))
return
}
var responseHeader = &header.SendMessageResponseHeader{}
if response.ExtFields != nil {
responseHeader.FromMap(response.ExtFields) //change map[string]interface{} into CustomerHeader struct
}
sendResult.SetMsgID(message.PropertiesKeyValue(constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX))
sendResult.SetOffsetMsgID(responseHeader.MsgId)
sendResult.SetQueueOffset(responseHeader.QueueOffset)
sendResult.SetTransactionID(responseHeader.TransactionId)
messageQueue := rocketmqm.MessageQueue{Topic: message.Topic(), BrokerName: brokerName,
QueueId: responseHeader.QueueId}
sendResult.SetMessageQueue(messageQueue)
var regionId = responseHeader.MsgRegion
if len(regionId) == 0 {
regionId = "DefaultRegion"
}
sendResult.SetRegionID(regionId)
return
}
func (d *DefaultProducerService) checkMessage(message *message.MessageImpl) (err error) {
if message == nil {
err = errors.New("message is nil")
return
}
if len(message.Topic()) == 0 {
err = errors.New("topic is empty")
return
}
if message.Topic() == constant.DEFAULT_TOPIC {
err = errors.New("the topic[" + message.Topic() + "] is conflict with default topic.")
return
}
if len(message.Topic()) > constant.MAX_MESSAGE_TOPIC_SIZE {
err = errors.New("the specified topic is longer than topic max length 255.")
return
}
if !util.MatchString(message.Topic(), `^[%|a-zA-Z0-9_-]+$`) {
err = errors.New("the specified topic[" + message.Topic() + "] contains illegal characters")
return
}
if len(message.Body()) == 0 {
err = errors.New("messageBody is empty")
return
}
if len(message.Body()) > d.producerConfig.MaxMessageSize {
err = errors.New("messageBody is large than " + util.IntToString(d.producerConfig.MaxMessageSize))
return
}
return
}
func (d *DefaultProducerService) sendMsgUseTopicPublishInfo(message *message.MessageImpl, communicationMode string, sendCallback string, topicPublishInfo *model.TopicPublishInfo, timeout int64) (sendResult *rocketmqm.SendResult, err error) {
var (
sendTotalTime int
messageQueue rocketmqm.MessageQueue
)
sendTotalTime = 1
var lastFailedBroker = ""
//todo transaction
// todo retry
for i := 0; i < sendTotalTime; i++ {
messageQueue, err = selectOneMessageQueue(topicPublishInfo, lastFailedBroker)
if err != nil {
return
}
sendResult, err = d.doSendMessage(message, messageQueue, communicationMode, sendCallback, topicPublishInfo, timeout)
if err != nil {
// todo retry
return
}
}
return
}
func (d *DefaultProducerService) doSendMessage(message *message.MessageImpl, messageQueue rocketmqm.MessageQueue,
communicationMode string, sendCallback string,
topicPublishInfo *model.TopicPublishInfo,
timeout int64) (sendResult *rocketmqm.SendResult, err error) {
var (
brokerAddr string
sysFlag int
compressMessageFlag int
)
compressMessageFlag, err = d.tryToCompressMessage(message)
if err != nil {
return
}
sysFlag = sysFlag | compressMessageFlag
brokerAddr = d.mqClient.fetchMasterBrokerAddress(messageQueue.BrokerName)
if len(brokerAddr) == 0 {
err = errors.New("The broker[" + messageQueue.BrokerName + "] not exist")
return
}
message.GeneratorMsgUniqueKey()
sendMessageHeader := &header.SendMessageRequestHeader{
ProducerGroup: d.producerGroup,
Topic: message.Topic(),
DefaultTopic: constant.DEFAULT_TOPIC,
DefaultTopicQueueNums: 4,
QueueId: messageQueue.QueueId,
SysFlag: sysFlag,
BornTimestamp: util.CurrentTimeMillisInt64(),
Flag: message.Flag(),
Properties: util.MessageProperties2String(message.Properties()),
UnitMode: false,
ReconsumeTimes: message.GetReconsumeTimes(),
MaxReconsumeTimes: message.GetMaxReconsumeTimes(),
}
sendResult, err = d.producerSendMessageRequest(messageQueue.BrokerName, brokerAddr, sendMessageHeader, message, timeout)
return
}
func (d *DefaultProducerService) tryToCompressMessage(message *message.MessageImpl) (compressedFlag int, err error) {
if len(message.Body()) < d.producerConfig.CompressMsgBodyOverHowMuch {
compressedFlag = 0
return
}
compressedFlag = int(constant.CompressedFlag)
var compressBody []byte
compressBody, err = util.CompressWithLevel(message.Body(), d.producerConfig.ZipCompressLevel)
message.SetBody(compressBody)
return
}