blob: 2f2a7b685e74736a108dc520453eb51ae209fd67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package service
import (
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
"time"
)
type ProducerService interface {
CheckConfig() (err error)
SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error)
}
type DefaultProducerService struct {
producerGroup string
producerConfig *config.RocketMqProducerConfig
mqClient RocketMqClient
mqFaultStrategy MQFaultStrategy
}
func NewDefaultProducerService(producerGroup string, producerConfig *config.RocketMqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
defaultProducerService = &DefaultProducerService{
mqClient: mqClient,
producerGroup: producerGroup,
producerConfig: producerConfig,
}
defaultProducerService.CheckConfig()
return
}
func (self *DefaultProducerService) CheckConfig() (err error) {
// todo check if not pass panic
return
}
func (self *DefaultProducerService) SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error) {
var (
topicPublishInfo *model.TopicPublishInfo
)
err = self.checkMessage(message)
if err != nil {
return
}
topicPublishInfo, err = self.mqClient.TryToFindTopicPublishInfo(message.Topic)
if err != nil {
return
}
if topicPublishInfo.JudgeTopicPublishInfoOk() == false {
err = errors.New("topicPublishInfo is error,topic=" + message.Topic)
return
}
glog.V(2).Info("op=look topicPublishInfo", topicPublishInfo)
//if(!ok) return error
sendResult, err = self.sendMsgUseTopicPublishInfo(message, communicationMode, sendCallback, topicPublishInfo, timeout)
return
}
func (self *DefaultProducerService) producerSendMessageRequest(brokerAddr string, sendMessageHeader remoting.CustomerHeader, message *model.Message, timeout int64) (sendResult *model.SendResult, err error) {
remotingCommand := remoting.NewRemotingCommandWithBody(remoting.SEND_MESSAGE, sendMessageHeader, message.Body)
var response *remoting.RemotingCommand
response, err = self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, timeout)
if err != nil {
glog.Error(err)
return
}
sendResult, err = processSendResponse(brokerAddr, message, response)
return
}
func processSendResponse(brokerName string, message *model.Message, response *remoting.RemotingCommand) (sendResult *model.SendResult, err error) {
sendResult = &model.SendResult{}
switch response.Code {
case remoting.FLUSH_DISK_TIMEOUT:
{
sendResult.SetSendStatus(model.FlushDiskTimeout)
break
}
case remoting.FLUSH_SLAVE_TIMEOUT:
{
sendResult.SetSendStatus(model.FlushSlaveTimeout)
break
}
case remoting.SLAVE_NOT_AVAILABLE:
{
sendResult.SetSendStatus(model.SlaveNotAvaliable)
break
}
case remoting.SUCCESS:
{
sendResult.SetSendStatus(model.SendOK)
break
}
default:
err = errors.New("response.Code error code")
return
}
var responseHeader = &header.SendMessageResponseHeader{}
if response.ExtFields != nil {
responseHeader.FromMap(response.ExtFields) //change map[string]interface{} into CustomerHeader struct
}
sendResult.SetMsgID(message.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX])
sendResult.SetOffsetMsgID(responseHeader.MsgId)
sendResult.SetQueueOffset(responseHeader.QueueOffset)
sendResult.SetTransactionID(responseHeader.TransactionId)
messageQueue := model.MessageQueue{Topic: message.Topic, BrokerName: brokerName,
QueueId: responseHeader.QueueId}
sendResult.SetMessageQueue(messageQueue)
var regionId = responseHeader.MsgRegion
if len(regionId) == 0 {
regionId = "DefaultRegion"
}
sendResult.SetRegionID(regionId)
return
}
func (self *DefaultProducerService) checkMessage(message *model.Message) (err error) {
if message == nil {
err = errors.New("message is nil")
return
}
if len(message.Topic) == 0 {
err = errors.New("topic is empty")
return
}
if message.Topic == constant.DEFAULT_TOPIC {
err = errors.New("the topic[" + message.Topic + "] is conflict with default topic.")
return
}
if len(message.Topic) > constant.MAX_MESSAGE_TOPIC_SIZE {
err = errors.New("the specified topic is longer than topic max length 255.")
return
}
//todo todo public static final String VALID_PATTERN_STR = "";
if !util.MatchString(message.Topic, `^[%|a-zA-Z0-9_-]+$`) {
err = errors.New("the specified topic[" + message.Topic + "] contains illegal characters")
return
}
if len(message.Body) == 0 {
err = errors.New("messageBody is empty")
return
}
if len(message.Body) > self.producerConfig.MaxMessageSize {
err = errors.New("messageBody is large than " + util.IntToString(self.producerConfig.MaxMessageSize))
return
}
return
}
func (self *DefaultProducerService) sendMsgUseTopicPublishInfo(message *model.Message, communicationMode string, sendCallback string, topicPublishInfo *model.TopicPublishInfo, timeout int64) (sendResult *model.SendResult, err error) {
var (
sendTotalTime int
messageQueue model.MessageQueue
)
sendTotalTime = 1
var lastFailedBroker = ""
//todo transaction
// todo retry
for i := 0; i < sendTotalTime; i++ {
messageQueue, err = selectOneMessageQueue(topicPublishInfo, lastFailedBroker)
if err != nil {
return
}
sendResult, err = self.doSendMessage(message, messageQueue, communicationMode, sendCallback, topicPublishInfo, timeout)
if err != nil {
// todo retry
return
}
}
return
}
func (self *DefaultProducerService) doSendMessage(message *model.Message, messageQueue model.MessageQueue,
communicationMode string, sendCallback string,
topicPublishInfo *model.TopicPublishInfo,
timeout int64) (sendResult *model.SendResult, err error) {
var (
brokerAddr string
sysFlag int
compressMessageFlag int
)
compressMessageFlag, err = self.tryToCompressMessage(message)
if err != nil {
return
}
sysFlag = sysFlag | compressMessageFlag
brokerAddr = self.mqClient.FetchMasterBrokerAddress(messageQueue.BrokerName)
if len(brokerAddr) == 0 {
err = errors.New("The broker[" + messageQueue.BrokerName + "] not exist")
return
}
message.GeneratorMsgUniqueKey()
sendMessageHeader := &header.SendMessageRequestHeader{
ProducerGroup: self.producerGroup,
Topic: message.Topic,
DefaultTopic: constant.DEFAULT_TOPIC,
DefaultTopicQueueNums: 4,
QueueId: messageQueue.QueueId,
SysFlag: sysFlag,
BornTimestamp: time.Now().UnixNano() / 1000000,
Flag: message.Flag,
Properties: util.MessageProperties2String(message.Properties),
UnitMode: false,
ReconsumeTimes: message.GetReconsumeTimes(),
MaxReconsumeTimes: message.GetMaxReconsumeTimes(),
}
sendResult, err = self.producerSendMessageRequest(brokerAddr, sendMessageHeader, message, timeout)
return
}
func (self *DefaultProducerService) tryToCompressMessage(message *model.Message) (compressedFlag int, err error) {
if len(message.Body) < self.producerConfig.CompressMsgBodyOverHowMuch {
compressedFlag = 0
return
}
compressedFlag = int(constant.CompressedFlag)
message.Body, err = util.CompressWithLevel(message.Body, self.producerConfig.ZipCompressLevel)
return
}