deduplicate user topics before using them (#426)
### Motivation
- Deduplicate user topics before using them, just like client-java's `TreeSet` topics implementation, see [ConsumerConfigurationData.java#L56](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L56)
### Modifications
- add `distinct` utility function to deduplicate user provided topics
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 370ea12..6379993 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -164,6 +164,7 @@
for i := range options.Topics {
options.Topics[i] = tns[i].Name
}
+ options.Topics = distinct(options.Topics)
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
@@ -555,6 +556,18 @@
return string(bytes)
}
+func distinct(fqdnTopics []string) []string {
+ set := make(map[string]struct{})
+ uniques := make([]string, 0, len(fqdnTopics))
+ for _, topic := range fqdnTopics {
+ if _, ok := set[topic]; !ok {
+ set[topic] = struct{}{}
+ uniques = append(uniques, topic)
+ }
+ }
+ return uniques
+}
+
func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
switch st {
case Exclusive: