The consumer needs to handle subscription to topics represented by regular expressions. The scope is namespace
in first stage, all topics/patten should be targeted in same namespace, This will make easy authentication and authorization control.
At last, we should add and implementation a serials of new methods in PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription); Consumer subscribe(Pattern topicsPattern, String subscription);
The goals the should be achieved are these below, we could achieve it one by one:
This will need a new implementation of ConsumerBase
which wrapper over multiple single-topic-consumers, let’s name it as TopicsConsumerImpl
. When user call new method Consumer subscribe(Collection<String> topics, String subscription);
It will iteratively new a ConsumerImpl
for each topic, and return a TopicsConsumerImpl
. The main work is:
TopicsConsumerImpl
class should provide implementation of abstract methods in ConsumerBase
, Should also provide some specific methods such as:// maintain a map for all the <Topic, Consumer>, after we subscribe all the topics. private final ConcurrentMap<String, ConsumerImpl> consumers = new ConcurrentHashMap<>(); // get topics Set<String> getTopics(); // get consumers List<ConsumerImpl> getConsumers(); // subscribe a topic void subscribeTopic(String topic); // unSubscribe a topic void unSubscribeTopic(String topic);
ConsumerBase
, because we have to add MessageId
with additional String topic
or consumer id
, Or we may need to change MessageIdData
in PulsarApi.proto
.As mentioned before, the scope is namespace
. The main work is:
TopicsConsumerImpl
class, need to keep the Pattern
, which was passed in from api for subscription.getList
to get a list of Topics. In interface PersistentTopics
:List<String> getList(String namespace) throws PulsarAdminException; List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException;
Consumer subscribe(String namespace, Pattern topicsPattern, String subscription)
should be like this:List<String> getList(String namespace)
to get all the topics;topicsPattern
to filter out the matched sub-topics-list.TopicsConsumerImpl
with the the sub-topics-list.The main work is:
Interface TopicsChangeListener { // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics. void onTopicsRemoved(Collection<String> topics); // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`. void onTopicsAdded(Collection<String> topics); }
Add a method void registerListener(TopicsChangeListener listener)
to TopicsConsumerImpl
List<String> getList(String namespace)
. And comparing the filtered fresh sub-topics-list with current topics holden in TopicsConsumerImpl
, try to get 2 lists: newAddedTopicsList
and removedTopicsList
.TopicsChangeListener.onTopicsAdded(newAddedTopicsList)
, and TopicsChangeListener.onTopicsRemoved(removedTopicsList)
to do subscribe and unsubscribe, and update consumers
map in TopicsConsumerImpl
.The changes will be mostly on the surface and on client side:
org.apache.pulsar.client.api.PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription); Consumer subscribe(Pattern topicsPattern, String subscription);
Consumer
, which is TopicsConsumerImpl
, returned by above subscribe
method