Support multi-topic for MultiTopicsReaderImpl (#9995)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a12e9a6..11d44e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -30,6 +30,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -50,6 +51,7 @@
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -244,6 +246,47 @@
         publishMessages(topic,100,false);
     }
 
+    @Test(timeOut = 20000)
+    public void testMultiTopic() throws Exception {
+        final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
+        final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic2, 3);
+        final String topic3 = "persistent://my-property/my-ns/topic3" + UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .startMessageId(MessageId.earliest)
+                .topics(topics).readerName("my-reader").create();
+        // create producer and send msg
+        List<Producer<String>> producerList = new ArrayList<>();
+        for (String topicName : topics) {
+            producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+        }
+        int msgNum = 10;
+        Set<String> messages = new HashSet<>();
+        for (int i = 0; i < producerList.size(); i++) {
+            Producer<String> producer = producerList.get(i);
+            for (int j = 0; j < msgNum; j++) {
+                String msg = i + "msg" + j;
+                producer.send(msg);
+                messages.add(msg);
+            }
+        }
+        // receive messages
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext();
+            assertTrue(messages.remove(message.getValue()));
+        }
+        assertEquals(messages.size(), 0);
+        assertEquals(client.consumersCount(), 1);
+        // clean up
+        for (Producer<String> producer : producerList) {
+            producer.close();
+        }
+        reader.close();
+        Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+    }
+
     @Test(timeOut = 10000)
     public void testKeyHashRangeReader() throws Exception {
         final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 045f0ef..a84208b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -110,6 +111,13 @@
     ReaderBuilder<T> topic(String topicName);
 
     /**
+     * Specify topics this reader will read from.
+     * @param topicNames
+     * @return
+     */
+    ReaderBuilder<T> topics(List<String> topicNames);
+
+    /**
      * The initial reader positioning is done by specifying a message id. The options are:
      * <ul>
      * <li>{@link MessageId#earliest}: Start reading from the earliest message available in the topic</li>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index f2e9f8a..b9593bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -55,13 +55,12 @@
             subscription = readerConfiguration.getSubscriptionName();
         }
         ConsumerConfigurationData<T> consumerConfiguration = new ConsumerConfigurationData<>();
-        consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
+        consumerConfiguration.getTopicNames().addAll(readerConfiguration.getTopicNames());
         consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
         consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
-        consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
 
         if (readerConfiguration.getReaderListener() != null) {
             ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0790e75..684a64b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -510,24 +510,18 @@
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
-        return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
-            .thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone));
-    }
-
-    <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
-
         if (conf == null) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
         }
-
-        String topic = conf.getTopicName();
-
-        if (!TopicName.isValid(topic)) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+        for (String topic : conf.getTopicNames()) {
+            if (!TopicName.isValid(topic)) {
+                return FutureUtil.failedFuture(new PulsarClientException
+                        .InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+            }
         }
 
         if (conf.getStartMessageId() == null) {
@@ -535,6 +529,34 @@
                     .failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
         }
 
+        if (conf.getTopicNames().size() == 1) {
+            return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
+                    .thenCompose(schemaClone -> createSingleTopicReaderAsync(conf, schemaClone));
+        }
+        return createMultiTopicReaderAsync(conf, schema);
+    }
+
+    protected <T> CompletableFuture<Reader<T>> createMultiTopicReaderAsync(
+            ReaderConfigurationData<T> conf, Schema<T> schema) {
+        CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
+        MultiTopicsReaderImpl<T> reader = new MultiTopicsReaderImpl<>(this,
+                conf, externalExecutorProvider, consumerSubscribedFuture, schema);
+        ConsumerBase<T> consumer = reader.getMultiTopicsConsumer();
+        consumers.add(consumer);
+        consumerSubscribedFuture.thenRun(() -> readerFuture.complete(reader))
+                .exceptionally(ex -> {
+                    log.warn("Failed to create multiTopicReader", ex);
+                    readerFuture.completeExceptionally(ex);
+                    return null;
+                });
+        return readerFuture;
+    }
+
+    protected <T> CompletableFuture<Reader<T>> createSingleTopicReaderAsync(
+            ReaderConfigurationData<T> conf, Schema<T> schema) {
+        String topic = conf.getTopicName();
+
         CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
 
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 36ec7be..b8f017f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -22,9 +22,11 @@
 import static org.apache.pulsar.client.api.KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NonNull;
@@ -79,7 +81,7 @@
 
     @Override
     public CompletableFuture<Reader<T>> createAsync() {
-        if (conf.getTopicName() == null) {
+        if (conf.getTopicNames().isEmpty()) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
         }
@@ -113,6 +115,17 @@
     }
 
     @Override
+    public ReaderBuilder<T> topics(List<String> topicNames) {
+        checkArgument(topicNames != null && topicNames.size() > 0,
+                "Passed in topicNames should not be null or empty.");
+        topicNames.forEach(topicName ->
+                checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
+        conf.getTopicNames().addAll(topicNames.stream().map(StringUtils::trim)
+                .collect(Collectors.toList()));
+        return this;
+    }
+
+    @Override
     public ReaderBuilder<T> startMessageId(MessageId startMessageId) {
         conf.setStartMessageId(startMessageId);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 2761602..14770ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -20,7 +20,9 @@
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -35,7 +37,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    private String topicName;
+    private Set<String> topicNames = new HashSet<>();
 
     @JsonIgnore
     private MessageId startMessageId;
@@ -59,10 +61,27 @@
 
     private transient List<Range> keyHashRanges;
 
+    @JsonIgnore
+    public String getTopicName() {
+        if (topicNames.size() > 1) {
+            throw new IllegalArgumentException("topicNames needs to be = 1");
+        }
+        return topicNames.iterator().next();
+    }
+
+    @JsonIgnore
+    public void setTopicName(String topicNames) {
+        //Compatible with a single topic
+        this.topicNames.clear();
+        this.topicNames.add(topicNames);
+    }
+
     @SuppressWarnings("unchecked")
     public ReaderConfigurationData<T> clone() {
         try {
-            return (ReaderConfigurationData<T>) super.clone();
+            ReaderConfigurationData<T> clone = (ReaderConfigurationData<T>) super.clone();
+            clone.setTopicNames(new HashSet<>(clone.getTopicNames()));
+            return clone;
         } catch (CloneNotSupportedException e) {
             throw new RuntimeException("Failed to clone ReaderConfigurationData");
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index efd4e48..39c8544 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -26,6 +26,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableSet;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -85,7 +86,7 @@
         String topicName = "test_src";
         MessageId messageId = new MessageIdImpl(1, 2, 3);
         Map<String, Object> config = new HashMap<>();
-        config.put("topicName", topicName);
+        config.put("topicNames", ImmutableSet.of(topicName));
         config.put("receiverQueueSize", 2000);
         ReaderBuilderImpl<byte[]> builder = (ReaderBuilderImpl<byte[]>) client.newReader()
             .startMessageId(messageId)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 3659ce8..692e20c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -19,8 +19,7 @@
 package org.apache.pulsar.client.impl.conf;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
 import org.testng.Assert;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -113,7 +112,7 @@
         confData.setReceiverQueueSize(1000000);
         confData.setReaderName("unknown-reader");
         Map<String, Object> config = new HashMap<>();
-        config.put("topicName", "test-topic");
+        config.put("topicNames", ImmutableSet.of("test-topic"));
         config.put("receiverQueueSize", 100);
         confData = ConfigurationDataUtils.loadData(config, confData, ReaderConfigurationData.class);
         assertEquals("test-topic", confData.getTopicName());