blob: 0c31a1aafc27ef9ec5faa45552e8fbdb35cc69b1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
pkgerrors "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
AckID(id MessageID) error
AckIDWithResponse(id MessageID) error
AckIDWithTxn(msgID MessageID, txn Transaction) error
AckIDCumulative(msgID MessageID) error
AckIDWithResponseCumulative(msgID MessageID) error
NackID(id MessageID)
NackMsg(msg Message)
}
type consumer struct {
sync.Mutex
topic string
client *client
options ConsumerOptions
consumers []*partitionConsumer
consumerName string
disableForceTopicCreation bool
// channel used to deliver message to clients
messageCh chan ConsumerMessage
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()
log log.Logger
metrics *internal.LeveledMetrics
}
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
return nil, newError(TopicNotFound, "topic is required")
}
if options.SubscriptionName == "" {
return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
}
if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = defaultReceiverQueueSize
}
if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}
if options.Name == "" {
options.Name = generateRandomName()
}
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
}
}
if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}
if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}
if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 10)
}
if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
}
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
}
topicName := internal.TopicNameWithoutPartitionPart(tn)
retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix
oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
retryTopic = oldRetryTopic
}
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
dlqTopic = oldDlqTopic
}
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
}
}
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
}
}
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log)
if err != nil {
return nil, err
}
// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
if topic == "" {
topic = options.Topics[0]
}
if tns, err = validateTopicNames(topic); err != nil {
return nil, err
}
topic = tns[0].Name
err = addMessageCryptoIfMissing(client, &options, topic)
if err != nil {
return nil, err
}
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
}
if len(options.Topics) > 1 {
if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
}
for i := range options.Topics {
options.Topics[i] = tns[i].Name
}
options.Topics = distinct(options.Topics)
err = addMessageCryptoIfMissing(client, &options, options.Topics)
if err != nil {
return nil, err
}
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
if options.TopicsPattern != "" {
tn, err := internal.ParseTopicName(options.TopicsPattern)
if err != nil {
return nil, err
}
pattern, err := extractTopicPattern(tn)
if err != nil {
return nil, err
}
err = addMessageCryptoIfMissing(client, &options, tn.Name)
if err != nil {
return nil, err
}
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}
return nil, newError(InvalidTopicName, "topic name is required for consumer")
}
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
consumer := &consumer{
topic: topic,
client: client,
options: options,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
err := consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
consumer.metrics.ConsumersOpened.Inc()
return consumer, nil
}
// Name returns the name of consumer.
func (c *consumer) Name() string {
return c.consumerName
}
func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
c.internalTopicSubscribeToPartitions()
}
}
}()
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
c.Lock()
defer c.Unlock()
oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
c.consumers = make([]*partitionConsumer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}
type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func(idx int, pt string) {
defer wg.Done()
var nackRedeliveryDelay time.Duration
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: c.options.startMessageID,
startMessageIDInclusive: c.options.StartMessageIDInclusive,
subscriptionMode: c.options.SubscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: idx,
consumer: cons,
}
}(partitionIdx, partitionTopic)
}
go func() {
wg.Wait()
close(ch)
}()
for ce := range ch {
if ce.err != nil {
err = ce.err
} else {
c.consumers[ce.partition] = ce.consumer
}
}
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}
if newNumPartitions < oldNumPartitions {
c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
} else {
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
}
return nil
}
func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
func (c *consumer) Unsubscribe() error {
c.Lock()
defer c.Unlock()
var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
}
}
if errMsg != "" {
return fmt.Errorf(errMsg)
}
return nil
}
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (c *consumer) AckWithTxn(msg Message, txn Transaction) error {
msgID := msg.ID()
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
return c.consumers[msgID.PartitionIdx()].AckIDWithTxn(msgID, txn)
}
// Chan return the message chan to users
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
// Ack the consumption of a single message
func (c *consumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
// AckID the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
if c.options.AckWithResponse {
return c.consumers[msgID.PartitionIdx()].AckIDWithResponse(msgID)
}
return c.consumers[msgID.PartitionIdx()].AckID(msgID)
}
// AckCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *consumer) AckCumulative(msg Message) error {
return c.AckIDCumulative(msg.ID())
}
// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *consumer) AckIDCumulative(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
if c.options.AckWithResponse {
return c.consumers[msgID.PartitionIdx()].AckIDWithResponseCumulative(msgID)
}
return c.consumers[msgID.PartitionIdx()].AckIDCumulative(msgID)
}
// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}
// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
if delay < 0 {
delay = 0
}
if !checkMessageIDType(msg.ID()) {
c.log.Warnf("invalid message id type %T", msg.ID())
return
}
msgID := c.messageID(msg.ID())
if msgID == nil {
return
}
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
}
for k, v := range customProperties {
props[k] = v
}
reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: props,
DeliverAfter: delay,
},
}
}
}
func (c *consumer) Nack(msg Message) {
if !checkMessageIDType(msg.ID()) {
c.log.Warnf("invalid message id type %T", msg.ID())
return
}
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
mid := c.messageID(msg.ID())
if mid == nil {
return
}
if mid.consumer != nil {
mid.NackByMsg(msg)
return
}
c.consumers[mid.partitionIdx].NackMsg(msg)
return
}
c.NackID(msg.ID())
}
func (c *consumer) NackID(msgID MessageID) {
if err := c.checkMsgIDPartition(msgID); err != nil {
return
}
c.consumers[msgID.PartitionIdx()].NackID(msgID)
}
func (c *consumer) Close() {
c.closeOnce.Do(func() {
c.stopDiscovery()
c.Lock()
defer c.Unlock()
var wg sync.WaitGroup
for i := range c.consumers {
wg.Add(1)
go func(pc *partitionConsumer) {
defer wg.Done()
pc.Close()
}(c.consumers[i])
}
wg.Wait()
close(c.closeCh)
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
c.metrics.ConsumersClosed.Inc()
c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
})
}
func (c *consumer) Seek(msgID MessageID) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
}
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
return err
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
return nil
}
func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error
// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
}
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
return errs
}
func (c *consumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition < 0 || int(partition) >= len(c.consumers) {
c.log.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
}
return nil
}
func (c *consumer) hasNext() bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure all paths cancel the context to avoid context leak
var wg sync.WaitGroup
wg.Add(len(c.consumers))
hasNext := make(chan bool)
for _, pc := range c.consumers {
pc := pc
go func() {
defer wg.Done()
if pc.hasNext() {
select {
case hasNext <- true:
case <-ctx.Done():
}
}
}()
}
go func() {
wg.Wait()
close(hasNext) // Close the channel after all goroutines have finished
}()
// Wait for either a 'true' result or for all goroutines to finish
for hn := range hasNext {
if hn {
return true
}
}
return false
}
func (c *consumer) setLastDequeuedMsg(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID)
return nil
}
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}
type random struct {
sync.Mutex
R *rand.Rand
}
func generateRandomName() string {
r.Lock()
defer r.Unlock()
chars := "abcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, 5)
for i := range bytes {
bytes[i] = chars[r.R.Intn(len(chars))]
}
return string(bytes)
}
func distinct(fqdnTopics []string) []string {
set := make(map[string]struct{})
uniques := make([]string, 0, len(fqdnTopics))
for _, topic := range fqdnTopics {
if _, ok := set[topic]; !ok {
set[topic] = struct{}{}
uniques = append(uniques, topic)
}
}
return uniques
}
func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
switch st {
case Exclusive:
return pb.CommandSubscribe_Exclusive
case Shared:
return pb.CommandSubscribe_Shared
case Failover:
return pb.CommandSubscribe_Failover
case KeyShared:
return pb.CommandSubscribe_Key_Shared
}
return pb.CommandSubscribe_Exclusive
}
func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_InitialPosition {
switch p {
case SubscriptionPositionLatest:
return pb.CommandSubscribe_Latest
case SubscriptionPositionEarliest:
return pb.CommandSubscribe_Earliest
}
return pb.CommandSubscribe_Latest
}
func (c *consumer) messageID(msgID MessageID) *trackingMessageID {
mid := toTrackingMessageID(msgID)
partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return nil
}
return mid
}
func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
// decryption is enabled, use default messagecrypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
false,
client.log.SubLogger(log.Fields{"topic": topics}))
if err != nil {
return err
}
options.Decryption.MessageCrypto = messageCrypto
}
return nil
}