blob: 54e9412f12a25ae78bd5d6f67fae4c3c68cae1c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package golang
import (
"context"
"fmt"
"math"
"sync"
"time"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"google.golang.org/protobuf/types/known/durationpb"
)
type Producer interface {
Send(context.Context, *Message) ([]*SendReceipt, error)
SendWithTransaction(context.Context, *Message, Transaction) ([]*SendReceipt, error)
SendAsync(context.Context, *Message, func(context.Context, []*SendReceipt, error))
BeginTransaction() Transaction
Start() error
GracefulStop() error
isClient
}
type defaultProducer struct {
po producerOptions
pSetting *producerSettings
cli *defaultClient
checker *TransactionChecker
isolated sync.Map
publishingRouteDataResultCache sync.Map
}
func (p *defaultProducer) Start() error {
err := p.cli.startUp()
if err == nil {
return nil
}
err2 := p.GracefulStop()
if err2 != nil {
return fmt.Errorf("startUp err=%w, shutdown err=%v", err, err2)
}
return fmt.Errorf("startUp err=%w", err)
}
var _ = Producer(&defaultProducer{})
func (p *defaultProducer) isClient() {
}
func (p *defaultProducer) isOn() bool {
return p.cli.on.Load()
}
func (p *defaultProducer) wrapHeartbeatRequest() *v2.HeartbeatRequest {
return &v2.HeartbeatRequest{
ClientType: v2.ClientType_PRODUCER,
}
}
func (p *defaultProducer) takeMessageQueues(plb PublishingLoadBalancer) ([]*v2.MessageQueue, error) {
return plb.TakeMessageQueues(p.isolated, p.getRetryMaxAttempts())
}
func (p *defaultProducer) getPublishingTopicRouteResult(ctx context.Context, topic string) (PublishingLoadBalancer, error) {
item, ok := p.publishingRouteDataResultCache.Load(topic)
if ok {
if ret, ok := item.(PublishingLoadBalancer); ok {
return ret, nil
}
}
route, err := p.cli.getMessageQueues(ctx, topic)
if err != nil {
return nil, err
}
plb, err := NewPublishingLoadBalancer(route)
if err != nil {
return nil, err
}
p.publishingRouteDataResultCache.Store(topic, plb)
return plb, nil
}
func (p *defaultProducer) wrapSendMessageRequest(pMsgs []*PublishingMessage) (*v2.SendMessageRequest, error) {
smr := &v2.SendMessageRequest{
Messages: []*v2.Message{},
}
for _, pMsg := range pMsgs {
msgV2, err := pMsg.toProtobuf()
if err != nil {
return nil, fmt.Errorf("wrapSendMessageRequest failed, {%v}", err)
}
smr.Messages = append(smr.Messages, msgV2)
}
return smr, nil
}
var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) {
copyOpt := defaultProducerOptions
po := &copyOpt
for _, opt := range opts {
opt.apply(po)
}
cli, err := po.clientFunc(config)
if err != nil {
return nil, err
}
p := &defaultProducer{
po: *po,
cli: cli.(*defaultClient),
checker: po.checker,
}
p.cli.initTopics = po.topics
endpoints, err := utils.ParseTarget(config.Endpoint)
if err != nil {
return nil, err
}
p.pSetting = &producerSettings{
clientId: p.cli.GetClientID(),
endpoints: endpoints,
clientType: v2.ClientType_PRODUCER,
retryPolicy: &v2.RetryPolicy{
MaxAttempts: po.maxAttempts,
Strategy: &v2.RetryPolicy_ExponentialBackoff{
ExponentialBackoff: &v2.ExponentialBackoff{
Max: durationpb.New(time.Duration(0)),
Initial: durationpb.New(time.Duration(0)),
Multiplier: 1,
},
},
},
requestTimeout: p.cli.opts.timeout,
validateMessageType: true,
maxBodySizeBytes: 4 * 1024 * 1024,
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
Name: topic,
}
p.pSetting.topics.Store(topic, topicResource)
}
p.cli.settings = p.pSetting
p.cli.clientImpl = p
return p, nil
}
func (p *defaultProducer) getRetryMaxAttempts() int {
return int(p.pSetting.GetRetryPolicy().GetMaxAttempts())
}
func (p *defaultProducer) getNextAttemptDelay(attempt int) time.Duration {
if attempt <= 0 {
return time.Duration(0)
}
retryPolicy := p.pSetting.GetRetryPolicy().Strategy.(*v2.RetryPolicy_ExponentialBackoff).ExponentialBackoff
delayNanos := uint32(math.Min(float64(retryPolicy.Initial.AsDuration().Nanoseconds())*math.Pow(float64(retryPolicy.Multiplier), 1.0*float64(attempt-1)), float64(retryPolicy.Max.AsDuration().Nanoseconds())))
if delayNanos <= 0 {
return time.Duration(0)
}
return time.Duration(delayNanos)
}
func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v2.MessageType,
candidates []*v2.MessageQueue, pubMessages []*PublishingMessage, attempt int) ([]*SendReceipt, error) {
ctx = p.cli.Sign(ctx)
idx := utils.Mod(int32(attempt)-1, len(candidates))
selectMessageQueue := candidates[idx]
if p.pSetting.IsValidateMessageType() && !utils.MatchMessageType(selectMessageQueue, messageType) {
return nil, fmt.Errorf("current message type not match with topic accept message types")
}
endpoints := selectMessageQueue.GetBroker().GetEndpoints()
sendReq, err := p.wrapSendMessageRequest(pubMessages)
if err != nil {
return nil, err
}
messageCommons := make([]*MessageCommon, 0)
for _, pubMessage := range pubMessages {
messageCommons = append(messageCommons, pubMessage.msg.GetMessageCommon())
}
p.cli.doBefore(MessageHookPoints_SEND, messageCommons)
watchTime := time.Now()
resp, err := p.cli.clientManager.SendMessage(ctx, endpoints, sendReq, p.pSetting.GetRequestTimeout())
duration := time.Since(watchTime)
messageHookPointsStatus := MessageHookPointsStatus_OK
// processSendResponse
tooManyRequests := false
if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
tooManyRequests = resp.GetStatus().GetCode() == v2.Code_TOO_MANY_REQUESTS
err = &ErrRpcStatus{
Code: int32(resp.Status.GetCode()),
Message: resp.GetStatus().GetMessage(),
}
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
}
p.cli.doAfter(MessageHookPoints_SEND, messageCommons, duration, messageHookPointsStatus)
maxAttempts := p.getRetryMaxAttempts()
if err != nil {
messageIds := make([]string, 0)
for _, pubMessage := range pubMessages {
messageIds = append(messageIds, pubMessage.messageId)
}
// retry
for _, address := range endpoints.GetAddresses() {
p.isolated.Store(utils.ParseAddress(address), true)
}
if attempt >= maxAttempts {
p.cli.log.Errorf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// No need more attempts for transactional message.
if messageType == v2.MessageType_TRANSACTION {
p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// Try to do more attempts.
nextAttempt := attempt + 1
// Retry immediately if the request is not throttled.
if tooManyRequests {
waitTime := p.getNextAttemptDelay(nextAttempt)
p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
waitTime, topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
time.Sleep(waitTime)
} else {
p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
}
return p.send1(ctx, topic, messageType, candidates, pubMessages, nextAttempt)
}
var res []*SendReceipt
for i := 0; i < len(resp.GetEntries()); i++ {
res = append(res, &SendReceipt{
MessageID: resp.GetEntries()[i].GetMessageId(),
TransactionId: resp.GetEntries()[i].GetTransactionId(),
Offset: resp.GetEntries()[i].GetOffset(),
Endpoints: endpoints,
})
}
if attempt > 1 {
p.cli.log.Infof("resend message successfully, topic=%s, maxAttempts=%d, attempt=%d, endpoints=%s",
topic, maxAttempts, attempt, endpoints.String())
}
return res, nil
}
func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txEnabled bool) ([]*SendReceipt, error) {
// check topic Name
topicName := msgs[0].GetMessage().Topic
for _, msg := range msgs {
if msg.GetMessage().Topic != topicName {
return nil, fmt.Errorf("messages to send have different topics")
}
}
pubMessages := make([]*PublishingMessage, len(msgs))
for idx, uMsg := range msgs {
msg := uMsg.GetMessage()
var pubMessage *PublishingMessage
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
}
pubMessages[idx] = pubMessage
}
// check message Type
messageType := pubMessages[0].messageType
for _, pubMessage := range pubMessages {
if pubMessage.messageType != messageType {
return nil, fmt.Errorf("messages to send have different types, please check")
}
}
var messageGroup *string
// Message group must be same if message type is FIFO, or no need to proceed.
if messageType == v2.MessageType_FIFO {
messageGroup = pubMessages[0].msg.GetMessageGroup()
for _, pubMessage := range pubMessages {
if pubMessage.msg.GetMessageGroup() != messageGroup {
return nil, fmt.Errorf("fifo messages to send have different message groups")
}
}
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
Name: topicName,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
if err != nil {
return nil, err
}
var candidates []*v2.MessageQueue
if messageGroup == nil {
candidates, err = p.takeMessageQueues(pubLoadBalancer)
} else {
candidates, err = pubLoadBalancer.TakeMessageQueueByMessageGroup(messageGroup)
}
if err != nil || len(candidates) == 0 {
return nil, fmt.Errorf("no broker available to sendMessage")
}
return p.send1(ctx, topicName, messageType, candidates, pubMessages, 1)
}
func (p *defaultProducer) Send(ctx context.Context, msg *Message) ([]*SendReceipt, error) {
if !p.isOn() {
return nil, fmt.Errorf("producer is not running")
}
msgs := []*UnifiedMessage{{
msg: msg,
}}
return p.send0(ctx, msgs, false)
}
func (p *defaultProducer) SendAsync(ctx context.Context, msg *Message, f func(context.Context, []*SendReceipt, error)) {
if !p.isOn() {
f(ctx, nil, fmt.Errorf("producer is not running"))
}
go func() {
msgs := []*UnifiedMessage{{
msg: msg,
}}
resp, err := p.send0(ctx, msgs, false)
f(ctx, resp, err)
}()
}
func (p *defaultProducer) SendWithTransaction(ctx context.Context, msg *Message, transaction Transaction) ([]*SendReceipt, error) {
if !p.isOn() {
return nil, fmt.Errorf("producer is not running")
}
t := transaction.(*transactionImpl)
pubMessage, err := t.tryAddMessage(msg)
if err != nil {
return nil, err
}
pubMsgs := []*UnifiedMessage{{
pubMsg: pubMessage,
}}
resp, err := p.send0(ctx, pubMsgs, true)
if err != nil {
return nil, err
}
err2 := t.tryAddReceipt(pubMessage, resp[0])
if err2 != nil {
p.cli.log.Error(err2)
}
return resp, err
}
func (p *defaultProducer) GracefulStop() error {
return p.cli.GracefulStop()
}
func (p *defaultProducer) BeginTransaction() Transaction {
return NewTransactionImpl(p)
}
func (p *defaultProducer) endTransaction(ctx context.Context, endpoints *v2.Endpoints, messageCommon *MessageCommon,
messageId string, transactionId string, resolution TransactionResolution) error {
ctx = p.cli.Sign(ctx)
request := &v2.EndTransactionRequest{
Topic: &v2.Resource{
Name: messageCommon.topic,
},
MessageId: messageId,
TransactionId: transactionId,
}
switch resolution {
case COMMIT:
request.Resolution = v2.TransactionResolution_COMMIT
case ROLLBACK:
request.Resolution = v2.TransactionResolution_ROLLBACK
default:
request.Resolution = v2.TransactionResolution_TRANSACTION_RESOLUTION_UNSPECIFIED
}
requestTimeout := p.pSetting.requestTimeout
messageHookPoints := MessageHookPoints_COMMIT_TRANSACTION
if resolution == ROLLBACK {
messageHookPoints = MessageHookPoints_ROLLBACK_TRANSACTION
}
messageCommons := []*MessageCommon{messageCommon}
p.cli.doBefore(messageHookPoints, messageCommons)
watchTime := time.Now()
resp, err := p.cli.clientManager.EndTransaction(ctx, endpoints, request, requestTimeout)
duration := time.Since(watchTime)
messageHookPointsStatus := MessageHookPointsStatus_OK
if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
err = &ErrRpcStatus{
Code: int32(resp.Status.GetCode()),
Message: resp.GetStatus().GetMessage(),
}
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
}
p.cli.doAfter(messageHookPoints, messageCommons, duration, messageHookPointsStatus)
return err
}
func (p *defaultProducer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) error {
transactionId := command.GetTransactionId()
messageId := command.GetMessage().GetSystemProperties().MessageId
if p.checker == nil {
return fmt.Errorf("no transaction checker registered, ignore it, messageId=%s, transactionId=%s, endpoints=%v", messageId, transactionId, endpoints)
}
messageView := fromProtobuf_MessageView0(command.Message)
go func(mv *MessageView) {
resolution := p.po.checker.Check(mv)
err := p.endTransaction(context.TODO(), endpoints,
mv.GetMessageCommon(), messageId, transactionId, resolution)
if err != nil {
p.cli.log.Errorf("exception raised while ending the transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w", messageId, transactionId, endpoints, err)
}
}(messageView)
return nil
}
func (p *defaultProducer) onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error {
return nil
}