Fix reconsume broken while using non-FQDN topics (#386)
### Issue
Retry policy not effective with non-FQDN topic.
- reproduction
```go
client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-01",
SubscriptionName: "my-sub",
RetryEnable: true,
DLQ: &pulsar.DLQPolicy{MaxDeliveries: 2},
})
msg, _ := consumer.Receive(context.Background())
consumer.ReconsumeLater(msg, 5*time.Second)
```
- logs
```
RN[0000] consumer of topic [persistent://public/default/topic-01] not exist unexpectedly topic="[topic-01 persistent://public/default/my-sub-RETRY]"
```
### Cause
For MultiTopicConsumer `consumers` map filed:
- key: user provided topic, maybe non-FQDN.
- value: consumer instance.
`ReconsumeLater` using msg's FQDN topic as key to find `consumer` in `consumers`,
if mismatch with non-FQDN topic, this invoke will be ignored, lead to Retry policy not effective.
### Modifications
- Normalize user provided topics as FQDN topics before initializing consumers.
- Add non-FQDN topic consumption case in Retry policy tests.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index dceb1e2..8ae9161 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -160,6 +160,8 @@
return nil, err
}
+ // normalize as FQDN topics
+ var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
@@ -167,17 +169,20 @@
topic = options.Topics[0]
}
- if err := validateTopicNames(topic); err != nil {
+ if tns, err = validateTopicNames(topic); err != nil {
return nil, err
}
-
+ topic = tns[0].Name
return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
}
if len(options.Topics) > 1 {
- if err := validateTopicNames(options.Topics...); err != nil {
+ if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
}
+ for i := range options.Topics {
+ options.Topics[i] = tns[i].Name
+ }
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 97d2de2..f31bb69 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1147,7 +1147,7 @@
}
func TestRLQ(t *testing.T) {
- topic := "persistent://public/default/" + newTopicName()
+ topic := newTopicName()
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
@@ -1243,7 +1243,7 @@
func TestRLQMultiTopics(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
- topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+ topic02 := fmt.Sprintf("topic-%d-2", now)
topics := []string{topic01, topic02}
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
@@ -1270,7 +1270,7 @@
// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
- Topic: "persistent://public/default/" + subName + "-DLQ",
+ Topic: subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
diff --git a/pulsar/helper.go b/pulsar/helper.go
index 0f8cf20..fb42cb5 100644
--- a/pulsar/helper.go
+++ b/pulsar/helper.go
@@ -53,15 +53,16 @@
return msg
}
-func validateTopicNames(topics ...string) error {
- var errs error
- for _, t := range topics {
- if _, err := internal.ParseTopicName(t); err != nil {
- errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t)
+func validateTopicNames(topics ...string) ([]*internal.TopicName, error) {
+ tns := make([]*internal.TopicName, len(topics))
+ for i, t := range topics {
+ tn, err := internal.ParseTopicName(t)
+ if err != nil {
+ return nil, pkgerrors.Wrapf(err, "invalid topic name: %s", t)
}
+ tns[i] = tn
}
-
- return errs
+ return tns, nil
}
func toKeyValues(metadata map[string]string) []*pb.KeyValue {