fix retry policy not effective with partitioned topic (#425)
### Issue
Retry policy not effective with partitioned topic.
- reproduction
topic-01 have 2 partitions:
```go
client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-01",
SubscriptionName: "my-sub",
RetryEnable: true,
DLQ: &pulsar.DLQPolicy{MaxDeliveries: 2},
})
msg, _ := consumer.Receive(context.Background())
consumer.ReconsumeLater(msg, 5*time.Second)
```
- logs
```
WARN[0000] consumer of topic [persistent://public/default/topic-01-partition-0] not exist unexpectedly topic=" [persistent://public/default/topic-01 persistent://platform/psr/my-sub-RETRY]"
```
### Cause
For MultiTopicConsumer `consumers` map filed:
- key: FQDN topics, without partition number suffix.
- value: consumer instance.
`ReconsumeLater` using msg's partition number suffix topic as key, to find `consumer` in `consumers`, but allways not found, lead to Retry policy not effective.
### Modifications
- Trim partition number in partitioned topic before using it
### Verifying this change
- [x] Make sure that the change passes the CI checks.
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index e5ede99..479a12a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -24,6 +24,7 @@
"sync"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
pkgerrors "github.com/pkg/errors"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -136,7 +137,18 @@
}
func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
- consumer, ok := c.consumers[msg.Topic()]
+ names, err := validateTopicNames(msg.Topic())
+ if err != nil {
+ c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
+ return
+ }
+ if len(names) != 1 {
+ c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), names)
+ return
+ }
+
+ fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0])
+ consumer, ok := c.consumers[fqdnTopic]
if !ok {
c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
return
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 5b9b6e0..82a1f97 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1149,6 +1149,9 @@
func TestRLQ(t *testing.T) {
topic := newTopicName()
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
+ makeHTTPCall(t, http.MethodPut, testURL, "3")
+
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100