blob: 517824d5040cc62cce22de2c1914e583932a7ac5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
pb ""
pkgerrors ""
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
AckID(id trackingMessageID) error
AckIDWithResponse(id trackingMessageID) error
NackID(id trackingMessageID)
NackMsg(msg Message)
type consumer struct {
topic string
client *client
options ConsumerOptions
consumers []*partitionConsumer
consumerName string
disableForceTopicCreation bool
// channel used to deliver message to clients
messageCh chan ConsumerMessage
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()
log log.Logger
metrics *internal.LeveledMetrics
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
return nil, newError(TopicNotFound, "topic is required")
if options.SubscriptionName == "" {
return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = defaultReceiverQueueSize
if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
if options.Name == "" {
options.Name = generateRandomName()
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 10)
if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
dlq, err := newDlqRouter(client, options.DLQ, client.log)
if err != nil {
return nil, err
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log)
if err != nil {
return nil, err
// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
if topic == "" {
topic = options.Topics[0]
if tns, err = validateTopicNames(topic); err != nil {
return nil, err
topic = tns[0].Name
err = addMessageCryptoIfMissing(client, &options, topic)
if err != nil {
return nil, err
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
if len(options.Topics) > 1 {
if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
for i := range options.Topics {
options.Topics[i] = tns[i].Name
options.Topics = distinct(options.Topics)
err = addMessageCryptoIfMissing(client, &options, options.Topics)
if err != nil {
return nil, err
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
if options.TopicsPattern != "" {
tn, err := internal.ParseTopicName(options.TopicsPattern)
if err != nil {
return nil, err
pattern, err := extractTopicPattern(tn)
if err != nil {
return nil, err
err = addMessageCryptoIfMissing(client, &options, tn.Name)
if err != nil {
return nil, err
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
return nil, newError(InvalidTopicName, "topic name is required for consumer")
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
consumer := &consumer{
topic: topic,
client: client,
options: options,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
err := consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
return consumer, nil
// Name returns the name of consumer.
func (c *consumer) Name() string {
return c.consumerName
func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
return func() {
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
oldNumPartitions := 0
newNumPartitions := len(partitions)
defer c.Unlock()
oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
c.consumers = make([]*partitionConsumer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func(idx int, pt string) {
defer wg.Done()
var nackRedeliveryDelay time.Duration
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: idx,
consumer: cons,
}(partitionIdx, partitionTopic)
go func() {
for ce := range ch {
if ce.err != nil {
err = ce.err
} else {
c.consumers[ce.partition] = ce.consumer
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
return err
if newNumPartitions < oldNumPartitions {
} else {
return nil
func (c *consumer) Subscription() string {
return c.options.SubscriptionName
func (c *consumer) Unsubscribe() error {
defer c.Unlock()
var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
if errMsg != "" {
return fmt.Errorf(errMsg)
return nil
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
// Chan return the message chan to users
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
// Ack the consumption of a single message
func (c *consumer) Ack(msg Message) error {
return c.AckID(msg.ID())
// AckID the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) error {
mid, ok := c.messageID(msgID)
if !ok {
return errors.New("failed to convert trackingMessageID")
if mid.consumer != nil {
return mid.Ack()
if c.options.AckWithResponse {
return c.consumers[mid.partitionIdx].AckIDWithResponse(mid)
return c.consumers[mid.partitionIdx].AckID(mid)
// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
if delay < 0 {
delay = 0
msgID, ok := c.messageID(msg.ID())
if !ok {
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: props,
DeliverAfter: delay,
func (c *consumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
mid, ok := c.messageID(msg.ID())
if !ok {
if mid.consumer != nil {
func (c *consumer) NackID(msgID MessageID) {
mid, ok := c.messageID(msgID)
if !ok {
if mid.consumer != nil {
func (c *consumer) Close() {
c.closeOnce.Do(func() {
defer c.Unlock()
var wg sync.WaitGroup
for i := range c.consumers {
go func(pc *partitionConsumer) {
defer wg.Done()
func (c *consumer) Seek(msgID MessageID) error {
defer c.Unlock()
if len(c.consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
mid, ok := c.messageID(msgID)
if !ok {
return nil
return c.consumers[mid.partitionIdx].Seek(mid)
func (c *consumer) SeekByTime(time time.Time) error {
defer c.Unlock()
var errs error
// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
return errs
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
type random struct {
R *rand.Rand
func generateRandomName() string {
defer r.Unlock()
chars := "abcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, 5)
for i := range bytes {
bytes[i] = chars[r.R.Intn(len(chars))]
return string(bytes)
func distinct(fqdnTopics []string) []string {
set := make(map[string]struct{})
uniques := make([]string, 0, len(fqdnTopics))
for _, topic := range fqdnTopics {
if _, ok := set[topic]; !ok {
set[topic] = struct{}{}
uniques = append(uniques, topic)
return uniques
func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
switch st {
case Exclusive:
return pb.CommandSubscribe_Exclusive
case Shared:
return pb.CommandSubscribe_Shared
case Failover:
return pb.CommandSubscribe_Failover
case KeyShared:
return pb.CommandSubscribe_Key_Shared
return pb.CommandSubscribe_Exclusive
func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_InitialPosition {
switch p {
case SubscriptionPositionLatest:
return pb.CommandSubscribe_Latest
case SubscriptionPositionEarliest:
return pb.CommandSubscribe_Earliest
return pb.CommandSubscribe_Latest
func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return trackingMessageID{}, false
partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return trackingMessageID{}, false
return mid, true
func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
// decryption is enabled, use default messagecrypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
client.log.SubLogger(log.Fields{"topic": topics}))
if err != nil {
return err
options.Decryption.MessageCrypto = messageCrypto
return nil