| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package pulsar |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "regexp" |
| "strings" |
| "sync" |
| "time" |
| |
| pkgerrors "github.com/pkg/errors" |
| |
| log "github.com/sirupsen/logrus" |
| |
| "github.com/apache/pulsar-client-go/pulsar/internal" |
| ) |
| |
| const ( |
| defaultAutoDiscoveryDuration = 1 * time.Minute |
| ) |
| |
| type regexConsumer struct { |
| client *client |
| dlq *dlqRouter |
| |
| options ConsumerOptions |
| |
| messageCh chan ConsumerMessage |
| |
| namespace string |
| pattern *regexp.Regexp |
| |
| consumersLock sync.Mutex |
| consumers map[string]Consumer |
| subscribeCh chan []string |
| unsubscribeCh chan []string |
| |
| closeOnce sync.Once |
| closeCh chan struct{} |
| |
| ticker *time.Ticker |
| |
| log *log.Entry |
| |
| consumerName string |
| } |
| |
| func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, |
| msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) { |
| rc := ®exConsumer{ |
| client: c, |
| dlq: dlq, |
| options: opts, |
| messageCh: msgCh, |
| |
| namespace: tn.Namespace, |
| pattern: pattern, |
| |
| consumers: make(map[string]Consumer), |
| subscribeCh: make(chan []string, 1), |
| unsubscribeCh: make(chan []string, 1), |
| |
| closeCh: make(chan struct{}), |
| |
| log: log.WithField("topic", tn.Name), |
| consumerName: opts.Name, |
| } |
| |
| topics, err := rc.topics() |
| if err != nil { |
| return nil, err |
| } |
| |
| var errs error |
| for ce := range subscriber(c, topics, opts, msgCh, dlq) { |
| if ce.err != nil { |
| errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) |
| } else { |
| rc.consumers[ce.topic] = ce.consumer |
| } |
| } |
| |
| if errs != nil { |
| for _, c := range rc.consumers { |
| c.Close() |
| } |
| return nil, errs |
| } |
| |
| // set up timer |
| duration := opts.AutoDiscoveryPeriod |
| if duration <= 0 { |
| duration = defaultAutoDiscoveryDuration |
| } |
| rc.ticker = time.NewTicker(duration) |
| |
| go rc.monitor() |
| |
| consumersOpened.Inc() |
| return rc, nil |
| } |
| |
| func (c *regexConsumer) Subscription() string { |
| return c.options.SubscriptionName |
| } |
| |
| func (c *regexConsumer) Unsubscribe() error { |
| var errs error |
| c.consumersLock.Lock() |
| defer c.consumersLock.Unlock() |
| |
| for topic, consumer := range c.consumers { |
| if err := consumer.Unsubscribe(); err != nil { |
| msg := fmt.Sprintf("unable to unsubscribe from topic=%s subscription=%s", |
| topic, c.Subscription()) |
| errs = pkgerrors.Wrap(err, msg) |
| } |
| } |
| return errs |
| } |
| |
| func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) { |
| for { |
| select { |
| case <-c.closeCh: |
| return nil, ErrConsumerClosed |
| case cm, ok := <-c.messageCh: |
| if !ok { |
| return nil, ErrConsumerClosed |
| } |
| return cm.Message, nil |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| } |
| } |
| } |
| |
| // Chan |
| func (c *regexConsumer) Chan() <-chan ConsumerMessage { |
| return c.messageCh |
| } |
| |
| // Ack the consumption of a single message |
| func (c *regexConsumer) Ack(msg Message) { |
| c.AckID(msg.ID()) |
| } |
| |
| // Ack the consumption of a single message, identified by its MessageID |
| func (c *regexConsumer) AckID(msgID MessageID) { |
| mid, ok := toTrackingMessageID(msgID) |
| if !ok { |
| c.log.Warnf("invalid message id type %T", msgID) |
| return |
| } |
| |
| if mid.consumer == nil { |
| c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) |
| return |
| } |
| |
| mid.Ack() |
| } |
| |
| func (c *regexConsumer) Nack(msg Message) { |
| c.NackID(msg.ID()) |
| } |
| |
| func (c *regexConsumer) NackID(msgID MessageID) { |
| mid, ok := toTrackingMessageID(msgID) |
| if !ok { |
| c.log.Warnf("invalid message id type %T", msgID) |
| return |
| } |
| |
| if mid.consumer == nil { |
| c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) |
| return |
| } |
| |
| mid.Nack() |
| } |
| |
| func (c *regexConsumer) Close() { |
| c.closeOnce.Do(func() { |
| c.ticker.Stop() |
| close(c.closeCh) |
| |
| var wg sync.WaitGroup |
| c.consumersLock.Lock() |
| defer c.consumersLock.Unlock() |
| wg.Add(len(c.consumers)) |
| for _, con := range c.consumers { |
| go func(consumer Consumer) { |
| defer wg.Done() |
| consumer.Close() |
| }(con) |
| } |
| wg.Wait() |
| c.dlq.close() |
| consumersClosed.Inc() |
| }) |
| } |
| |
| func (c *regexConsumer) Seek(msgID MessageID) error { |
| return errors.New("seek command not allowed for regex consumer") |
| } |
| |
| func (c *regexConsumer) SeekByTime(time time.Time) error { |
| return errors.New("seek command not allowed for regex consumer") |
| } |
| |
| // Name returns the name of consumer. |
| func (c *regexConsumer) Name() string { |
| return c.consumerName |
| } |
| |
| func (c *regexConsumer) closed() bool { |
| select { |
| case <-c.closeCh: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| func (c *regexConsumer) monitor() { |
| for { |
| select { |
| case <-c.closeCh: |
| return |
| case <-c.ticker.C: |
| c.log.Debug("Auto discovering topics") |
| if !c.closed() { |
| c.discover() |
| } |
| case topics := <-c.subscribeCh: |
| if len(topics) > 0 && !c.closed() { |
| c.subscribe(topics, c.dlq) |
| } |
| case topics := <-c.unsubscribeCh: |
| if len(topics) > 0 && !c.closed() { |
| c.unsubscribe(topics) |
| } |
| } |
| } |
| } |
| |
| func (c *regexConsumer) discover() { |
| topics, err := c.topics() |
| if err != nil { |
| c.log.WithError(err).Errorf("Failed to discover topics") |
| return |
| } |
| known := c.knownTopics() |
| newTopics := topicsDiff(topics, known) |
| staleTopics := topicsDiff(known, topics) |
| |
| if log.GetLevel() == log.DebugLevel { |
| l := c.log.WithFields(log.Fields{ |
| "new_topics": newTopics, |
| "old_topics": staleTopics, |
| }) |
| l.Debug("discover topics") |
| } |
| |
| c.unsubscribeCh <- staleTopics |
| c.subscribeCh <- newTopics |
| } |
| |
| func (c *regexConsumer) knownTopics() []string { |
| c.consumersLock.Lock() |
| defer c.consumersLock.Unlock() |
| topics := make([]string, len(c.consumers)) |
| n := 0 |
| for t := range c.consumers { |
| topics[n] = t |
| n++ |
| } |
| |
| return topics |
| } |
| |
| func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) { |
| if log.GetLevel() == log.DebugLevel { |
| c.log.WithField("topics", topics).Debug("subscribe") |
| } |
| consumers := make(map[string]Consumer, len(topics)) |
| for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq) { |
| if ce.err != nil { |
| c.log.Warnf("Failed to subscribe to topic=%s", ce.topic) |
| } else { |
| consumers[ce.topic] = ce.consumer |
| } |
| } |
| |
| c.consumersLock.Lock() |
| defer c.consumersLock.Unlock() |
| for t, consumer := range consumers { |
| c.consumers[t] = consumer |
| } |
| } |
| |
| func (c *regexConsumer) unsubscribe(topics []string) { |
| if log.GetLevel() == log.DebugLevel { |
| c.log.WithField("topics", topics).Debug("unsubscribe") |
| } |
| consumers := make(map[string]Consumer, len(topics)) |
| c.consumersLock.Lock() |
| for _, t := range topics { |
| if consumer, ok := c.consumers[t]; ok { |
| consumers[t] = consumer |
| delete(c.consumers, t) |
| } |
| } |
| c.consumersLock.Unlock() |
| |
| for t, consumer := range consumers { |
| log.Debugf("unsubscribe from topic=%s subscription=%s", t, c.options.SubscriptionName) |
| if err := consumer.Unsubscribe(); err != nil { |
| c.log.Warnf("unable to unsubscribe from topic=%s subscription=%s", |
| t, c.options.SubscriptionName) |
| } |
| consumer.Close() |
| } |
| } |
| |
| func (c *regexConsumer) topics() ([]string, error) { |
| topics, err := c.client.namespaceTopics(c.namespace) |
| if err != nil { |
| return nil, err |
| } |
| |
| filtered := filterTopics(topics, c.pattern) |
| return filtered, nil |
| } |
| |
| type consumerError struct { |
| err error |
| topic string |
| consumer Consumer |
| } |
| |
| func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage, |
| dlq *dlqRouter) <-chan consumerError { |
| consumerErrorCh := make(chan consumerError, len(topics)) |
| var wg sync.WaitGroup |
| wg.Add(len(topics)) |
| go func() { |
| wg.Wait() |
| close(consumerErrorCh) |
| }() |
| |
| for _, t := range topics { |
| go func(topic string) { |
| defer wg.Done() |
| c, err := newInternalConsumer(c, opts, topic, ch, dlq, true) |
| consumerErrorCh <- consumerError{ |
| err: err, |
| topic: topic, |
| consumer: c, |
| } |
| }(t) |
| } |
| |
| return consumerErrorCh |
| } |
| |
| func filterTopics(topics []string, regex *regexp.Regexp) []string { |
| matches := make(map[string]bool) |
| matching := make([]string, 0) |
| for _, t := range topics { |
| tn, _ := internal.ParseTopicName(t) |
| topic := internal.TopicNameWithoutPartitionPart(tn) |
| if _, ok := matches[topic]; ok { |
| continue |
| } |
| if regex.MatchString(topic) { |
| matches[topic] = true |
| matching = append(matching, topic) |
| } |
| } |
| |
| return matching |
| } |
| |
| // topicDiff returns all topics in topics1 that are not in topics2 |
| func topicsDiff(topics1 []string, topics2 []string) []string { |
| if len(topics2) == 0 { |
| return topics1 |
| } |
| diff := make([]string, 0) |
| topics := make(map[string]bool, len(topics2)) |
| for _, t := range topics2 { |
| topics[t] = true |
| } |
| |
| for _, t := range topics1 { |
| if !topics[t] { |
| diff = append(diff, t) |
| } |
| } |
| |
| return diff |
| } |
| |
| func extractTopicPattern(tn *internal.TopicName) (*regexp.Regexp, error) { |
| idx := strings.Index(tn.Name, tn.Namespace) |
| if idx > 0 { |
| return regexp.Compile(tn.Name[idx:]) |
| } |
| |
| return regexp.Compile(tn.Name) |
| } |