blob: ddaf226fd751917e5a552e6fac15507c0a137142 [file] [log] [blame] [view]
# PIP-35: Improve topic lookup for topics that have high number of partitions
* **Status**: Proposal
* **Author**: Zhengxin Cai, Lin Lin
* **Pull Request**:
* **Mailing List discussion**: http://mail-archives.apache.org/mod_mbox/pulsar-dev/201904.mbox/%3CCAHQi0xemFAGC6P9e240-8Ho0z4qtUv0uSBLtHfSs5ZeBDJXEug%40mail.gmail.com%3E
## Motivation
As this [github issue](https://github.com/apache/pulsar/issues/1088) brings up, for a partitioned topic, client has to send LookUp request for each partition. Here we propose a solution to make lookup easier for partitioned topic.
We could either make current LookUp(getBroker) api smarter and it will check if topic is a partitioned topic or not. If its a normal topic just return normal LookUp response which will be Pair<InetSocketAddress, InetSocketAddress>, if its a partitioned topic then it will return
List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that serving that partitioned topic.
Or we could create a new BatchLookUp api, like getBrokers. It can take a list of topic as input, where each topic has to be either a non partitioned topic or a partition of a partitioned topic and itll return Map<TopicName, Pair<InetSocketAddress, InetSocketAddress>> where its a map of input topic to corresponding address.
Prefer to go with second approach as it not only support lookup for all partitions of partitioned topic, it also support bulk lookup for many normal topics in one request.
## Proposal
In TopicLookupBase is where the core lookup logic will happens.
Add new lookupTopicBatchAsync, within the method, after validation, for each topic, well fist find out the bundle it belong, then invoke NameSpaceService.findBrokerServiceUrl for each bundle and create a Map<Bundle , List<TopicName>>.
Then iterate through map and create response for each topicname.
In LookupService, when findBrokers invoke newBatchLookup, itll get a response of a list<lookupresponse>, where some of response contains redirect, for these response will invoke findBrokers again to contact correct cluster. Combine the result of 2 invocation to get a final result.
Heres proposed change to protocal buffer definition and apis:
*PulsarApi.proto*
```
Add
Type enum:
BATCH_LOOKUP = 23;
BATCH_LOOKUP_RESPONSE = 24;
BaseBommand:
optional CommandBatchLookupTopic lookupTopic = 23;
optional CommandBatchLookupTopicResponse lookupTopicResponse = 24;
CommandBatchLookupTopic:
{
repeated string topics = 1;
required uint64 request_id = 2;
}
CommandTopicResponse:
{
repeated string topicname = 10;
}
```
- Commands.java
- Add newBatchLookUp
- ClientCnx.java
- Add newBatchLookup/handleBatchLookUpResponse
- ServiceCnx.java
- Add handleBatchLookup
- LookupProxyHandler.java
- Add handleBatchLookup
- ProxyConnection.java
- Add handleBatchLookup
- PulsarDecoder.java
- Handle batchLookup
- Add BatchLookupResult.java
- TopicLookupBase.java
- Add lookupTopicBatchAsync
```
List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics) {
List<CompletableFuture> resultFutures;
List<CompletableFuture> futures;
Map<Bundle.toString, List<Topic> map;
topics.foreach((topic) -> {
CompletableFuture validateFuture;
CompletableFuture getNameSpaceBundleFuture;
CompletableFuture lookupFuture;
getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete)
validateFuture.thenAccept(validaitonFailureResponse -> {
if(validaitonFailureResponse != null)
lookupFuture.complete(validaitonFailureResponse)
pulsarService.getNamespaceService().getBundleAsync().
thenAccept(bundle -> map.get(bundle).add(requestId))
getNameSpaceBundleFuture.complete())
futures.add(getNameSpaceBundleFuture)
}}))
FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> {
entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> {
//Response will be address with success or redirect flag, and a list of topicnames in lookup request that belong to this bundle
lookupFuture.complete(newLookupResponse()))
resultFutures.add(lookupFuture)
})
}))
return resultFutures;
}
```
- TopicLookup.java(v2)
- Add lookupTopicBatchAsync
- BinaryProto/HttpLookupService.java
- Add getBrokers/findBrokers