As this github issue brings up, for a partitioned topic, client has to send LookUp request for each partition. Here we propose a solution to make lookup easier for partitioned topic.
We could either make current LookUp(getBroker) api smarter and it will check if topic is a partitioned topic or not. If it’s a normal topic just return normal LookUp response which will be Pair<InetSocketAddress, InetSocketAddress>, if it’s a partitioned topic then it will return List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that serving that partitioned topic.
Or we could create a new BatchLookUp api, like getBrokers. It can take a list of topic as input, where each topic has to be either a non partitioned topic or a partition of a partitioned topic and it’ll return Map<TopicName, Pair<InetSocketAddress, InetSocketAddress>> where it’s a map of input topic to corresponding address.
Prefer to go with second approach as it not only support lookup for all partitions of partitioned topic, it also support bulk lookup for many normal topics in one request.
In TopicLookupBase is where the core lookup logic will happens. Add new lookupTopicBatchAsync, within the method, after validation, for each topic, we’ll fist find out the bundle it belong, then invoke NameSpaceService.findBrokerServiceUrl for each bundle and create a Map<Bundle , List>. Then iterate through map and create response for each topicname.
In LookupService, when findBrokers invoke newBatchLookup, it’ll get a response of a list, where some of response contains redirect, for these response will invoke findBrokers again to contact correct cluster. Combine the result of 2 invocation to get a final result.
Here’s proposed change to protocal buffer definition and apis:
PulsarApi.proto
Add Type enum: BATCH_LOOKUP = 23; BATCH_LOOKUP_RESPONSE = 24; BaseBommand: optional CommandBatchLookupTopic lookupTopic = 23; optional CommandBatchLookupTopicResponse lookupTopicResponse = 24; CommandBatchLookupTopic: { repeated string topics = 1; required uint64 request_id = 2; } CommandTopicResponse: { repeated string topicname = 10; }
Commands.java
ClientCnx.java
ServiceCnx.java
LookupProxyHandler.java
ProxyConnection.java
PulsarDecoder.java
Add BatchLookupResult.java
TopicLookupBase.java
List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics) { List<CompletableFuture> resultFutures; List<CompletableFuture> futures; Map<Bundle.toString, List<Topic> map; topics.foreach((topic) -> { CompletableFuture validateFuture; CompletableFuture getNameSpaceBundleFuture; CompletableFuture lookupFuture; getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete) validateFuture.thenAccept(validaitonFailureResponse -> { if(validaitonFailureResponse != null) lookupFuture.complete(validaitonFailureResponse) pulsarService.getNamespaceService().getBundleAsync(). thenAccept(bundle -> map.get(bundle).add(requestId)) getNameSpaceBundleFuture.complete()) futures.add(getNameSpaceBundleFuture) }})) FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> { entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> { //Response will be address with success or redirect flag, and a list of topicnames in lookup request that belong to this bundle lookupFuture.complete(newLookupResponse())) resultFutures.add(lookupFuture) }) })) return resultFutures; }
TopicLookup.java(v2)
BinaryProto/HttpLookupService.java