blob: 290da274c3d0e70e20621fe46dbd8a08ed3dfde3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package service
import (
"encoding/json"
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
"github.com/golang/glog"
)
type SendMessageBackProducerService interface {
SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
}
type SendMessageBackProducerServiceImpl struct {
mqClient RocketMqClient
defaultProducerService *DefaultProducerService // one namesvr only one
consumerGroup string
consumerConfig *config.RocketMqConsumerConfig //one mq group have one
}
// send to original broker,if fail send a new retry message
func (self *SendMessageBackProducerServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
glog.V(2).Info("op=look_send_message_back", messageExt.MsgId, messageExt.Properties, string(messageExt.Body))
err = self.consumerSendMessageBack(brokerName, messageExt, delayLayLevel) // todo use
if err == nil {
return
}
glog.Error(err)
err = self.sendRetryMessageBack(messageExt)
return
}
func (self *SendMessageBackProducerServiceImpl) sendRetryMessageBack(messageExt *model.MessageExt) error {
// todo build a retry topic todo check todo check
retryMessage := &model.Message{}
originMessageId := messageExt.GetOriginMessageId()
retryMessage.Properties = messageExt.Properties
retryMessage.SetOriginMessageId(originMessageId)
retryMessage.Flag = messageExt.Flag
retryMessage.Topic = constant.RETRY_GROUP_TOPIC_PREFIX + self.consumerGroup
retryMessage.Body = messageExt.Body
retryMessage.SetRetryTopic(messageExt.Topic)
retryMessage.SetReconsumeTime(messageExt.GetReconsumeTimes() + 1)
retryMessage.SetMaxReconsumeTimes(self.consumerConfig.MaxReconsumeTimes)
retryMessage.SetDelayTimeLevel(3 + messageExt.GetReconsumeTimes())
pp, _ := json.Marshal(retryMessage)
glog.Info("look retryMessage ", string(pp), string(messageExt.Body))
sendResult, err := self.defaultProducerService.SendDefaultImpl(retryMessage, constant.COMMUNICATIONMODE_SYNC, "", self.defaultProducerService.producerConfig.SendMsgTimeout)
if err != nil {
glog.Error(err)
return err
}
xx, _ := json.Marshal(sendResult)
glog.Info("look retryMessage result", string(xx))
// todo need check send result
return nil
}
func (self *SendMessageBackProducerServiceImpl) InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
self.mqClient = mqClient
self.consumerGroup = consumerGroup
self.defaultProducerService = defaultProducerService
self.consumerConfig = consumerConfig
}
func (self *SendMessageBackProducerServiceImpl) consumerSendMessageBack(brokerName string, messageExt *model.MessageExt, delayLayLevel int) (err error) {
if len(brokerName) == 0 {
err = errors.New("broker can't be empty")
glog.Error(err)
return
}
brokerAddr := self.mqClient.FetchMasterBrokerAddress(brokerName)
sendMsgBackHeader := &header.ConsumerSendMsgBackRequestHeader{
Offset: messageExt.CommitLogOffset,
Group: self.consumerGroup,
DelayLevel: 0, //Message consume retry strategy<br>-1,no retry,put into DLQ directly<br>0,broker control retry frequency<br>>0,client control retry frequency
OriginMsgId: messageExt.MsgId,
OriginTopic: messageExt.Topic,
UnitMode: false,
MaxReconsumeTimes: int32(self.consumerConfig.MaxReconsumeTimes),
}
remotingCommand := remoting.NewRemotingCommand(remoting.CONSUMER_SEND_MSG_BACK, sendMsgBackHeader)
response, invokeErr := self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, 5000)
if invokeErr != nil {
err = invokeErr
return
}
if response == nil || response.Code != remoting.SUCCESS {
glog.Error("sendMsgBackRemarkError", response.Remark)
err = errors.New("send Message back error")
}
return
}