// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
pkgerrors ""
const (
defaultAutoDiscoveryDuration = 1 * time.Minute
type regexConsumer struct {
client *client
dlq *dlqRouter
rlq *retryRouter
options ConsumerOptions
messageCh chan ConsumerMessage
namespace string
pattern *regexp.Regexp
consumersLock sync.Mutex
consumers map[string]Consumer
closeOnce sync.Once
closeCh chan struct{}
ticker *time.Ticker
log log.Logger
consumerName string
func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := &regexConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,
namespace: tn.Namespace,
pattern: pattern,
consumers: make(map[string]Consumer),
closeCh: make(chan struct{}),
log: c.log.SubLogger(log.Fields{"topic": tn.Name}),
consumerName: opts.Name,
topics, err := rc.topics()
if err != nil {
return nil, err
var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
rc.consumers[ce.topic] = ce.consumer
if errs != nil {
for _, c := range rc.consumers {
return nil, errs
// set up timer
duration := opts.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
rc.ticker = time.NewTicker(duration)
go rc.monitor()
return rc, nil
func (c *regexConsumer) Subscription() string {
return c.options.SubscriptionName
func (c *regexConsumer) Unsubscribe() error {
var errs error
defer c.consumersLock.Unlock()
for topic, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
msg := fmt.Sprintf("unable to unsubscribe from topic=%s subscription=%s",
topic, c.Subscription())
errs = pkgerrors.Wrap(err, msg)
return errs
func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
// Chan return the messages chan to user
func (c *regexConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
// Ack the consumption of a single message
func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.")
// AckID the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) error {
if !checkMessageIDType(msgID) {
c.log.Warnf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
mid := toTrackingMessageID(msgID)
if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID)
return errors.New("consumer is nil in consumer_regex")
if c.options.AckWithResponse {
return mid.consumer.AckIDWithResponse(msgID)
return mid.consumer.AckID(msgID)
// AckID the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckWithTxn(msg Message, txn Transaction) error {
msgID := msg.ID()
if !checkMessageIDType(msgID) {
c.log.Warnf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
mid := toTrackingMessageID(msgID)
if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID)
return errors.New("consumer is nil in consumer_regex")
return mid.consumer.AckIDWithTxn(msgID, txn)
// AckCumulative the reception of all the messages in the stream up to (and including)
// the provided message.
func (c *regexConsumer) AckCumulative(msg Message) error {
return c.AckIDCumulative(msg.ID())
// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *regexConsumer) AckIDCumulative(msgID MessageID) error {
if !checkMessageIDType(msgID) {
c.log.Warnf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
mid := toTrackingMessageID(msgID)
if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID)
return errors.New("unable to ack message because consumer is nil")
if c.options.AckWithResponse {
return mid.consumer.AckIDWithResponseCumulative(msgID)
return mid.consumer.AckIDCumulative(msgID)
func (c *regexConsumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
msgID := msg.ID()
if !checkMessageIDType(msgID) {
c.log.Warnf("invalid message id type %T", msgID)
mid := toTrackingMessageID(msgID)
if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
func (c *regexConsumer) NackID(msgID MessageID) {
if !checkMessageIDType(msgID) {
c.log.Warnf("invalid message id type %T", msgID)
mid := toTrackingMessageID(msgID)
if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
func (c *regexConsumer) Close() {
c.closeOnce.Do(func() {
var wg sync.WaitGroup
defer c.consumersLock.Unlock()
for _, con := range c.consumers {
go func(consumer Consumer) {
defer wg.Done()
func (c *regexConsumer) Seek(_ MessageID) error {
return newError(SeekFailed, "seek command not allowed for regex consumer")
func (c *regexConsumer) SeekByTime(_ time.Time) error {
return newError(SeekFailed, "seek command not allowed for regex consumer")
// Name returns the name of consumer.
func (c *regexConsumer) Name() string {
return c.consumerName
func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
return true
return false
func (c *regexConsumer) monitor() {
for {
select {
case <-c.closeCh:
case <-c.ticker.C:
c.log.Debug("Auto discovering topics")
if !c.closed() {
func (c *regexConsumer) discover() {
topics, err := c.topics()
if err != nil {
c.log.WithError(err).Errorf("Failed to discover topics")
known := c.knownTopics()
newTopics := topicsDiff(topics, known)
staleTopics := topicsDiff(known, topics)
"new_topics": newTopics,
"old_topics": staleTopics,
Debug("discover topics")
if len(staleTopics) > 0 {
if len(newTopics) > 0 {
c.subscribe(newTopics, c.dlq, c.rlq)
func (c *regexConsumer) knownTopics() []string {
defer c.consumersLock.Unlock()
topics := make([]string, len(c.consumers))
n := 0
for t := range c.consumers {
topics[n] = t
return topics
func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
c.log.WithField("topics", topics).Debug("subscribe")
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
if ce.err != nil {
c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
} else {
consumers[ce.topic] = ce.consumer
defer c.consumersLock.Unlock()
for t, consumer := range consumers {
c.consumers[t] = consumer
func (c *regexConsumer) unsubscribe(topics []string) {
c.log.WithField("topics", topics).Debug("unsubscribe")
consumers := make(map[string]Consumer, len(topics))
for _, t := range topics {
if consumer, ok := c.consumers[t]; ok {
consumers[t] = consumer
delete(c.consumers, t)
for t, consumer := range consumers {
c.log.Debugf("unsubscribe from topic=%s subscription=%s", t, c.options.SubscriptionName)
if err := consumer.Unsubscribe(); err != nil {
c.log.Warnf("unable to unsubscribe from topic=%s subscription=%s",
t, c.options.SubscriptionName)
func (c *regexConsumer) topics() ([]string, error) {
topics, err := c.client.lookupService.GetTopicsOfNamespace(c.namespace, internal.Persistent)
if err != nil {
return nil, err
filtered := filterTopics(topics, c.pattern)
return filtered, nil
type consumerError struct {
err error
topic string
consumer Consumer
func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
var wg sync.WaitGroup
go func() {
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
consumer: c,
return consumerErrorCh
func filterTopics(topics []string, regex *regexp.Regexp) []string {
matches := make(map[string]bool)
matching := make([]string, 0)
for _, t := range topics {
tn, _ := internal.ParseTopicName(t)
topic := internal.TopicNameWithoutPartitionPart(tn)
if _, ok := matches[topic]; ok {
if regex.MatchString(topic) {
matches[topic] = true
matching = append(matching, topic)
return matching
// topicDiff returns all topics in topics1 that are not in topics2
func topicsDiff(topics1 []string, topics2 []string) []string {
if len(topics2) == 0 {
return topics1
diff := make([]string, 0)
topics := make(map[string]bool, len(topics2))
for _, t := range topics2 {
topics[t] = true
for _, t := range topics1 {
if !topics[t] {
diff = append(diff, t)
return diff
func extractTopicPattern(tn *internal.TopicName) (*regexp.Regexp, error) {
idx := strings.Index(tn.Name, tn.Namespace)
if idx > 0 {
return regexp.Compile(tn.Name[idx:])
return regexp.Compile(tn.Name)