Support multi-topic for MultiTopicsReaderImpl (#9995)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a12e9a6..11d44e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -50,6 +51,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -244,6 +246,47 @@
publishMessages(topic,100,false);
}
+ @Test(timeOut = 20000)
+ public void testMultiTopic() throws Exception {
+ final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
+ final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic2, 3);
+ final String topic3 = "persistent://my-property/my-ns/topic3" + UUID.randomUUID();
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+ PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .startMessageId(MessageId.earliest)
+ .topics(topics).readerName("my-reader").create();
+ // create producer and send msg
+ List<Producer<String>> producerList = new ArrayList<>();
+ for (String topicName : topics) {
+ producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+ }
+ int msgNum = 10;
+ Set<String> messages = new HashSet<>();
+ for (int i = 0; i < producerList.size(); i++) {
+ Producer<String> producer = producerList.get(i);
+ for (int j = 0; j < msgNum; j++) {
+ String msg = i + "msg" + j;
+ producer.send(msg);
+ messages.add(msg);
+ }
+ }
+ // receive messages
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext();
+ assertTrue(messages.remove(message.getValue()));
+ }
+ assertEquals(messages.size(), 0);
+ assertEquals(client.consumersCount(), 1);
+ // clean up
+ for (Producer<String> producer : producerList) {
+ producer.close();
+ }
+ reader.close();
+ Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+ }
+
@Test(timeOut = 10000)
public void testKeyHashRangeReader() throws Exception {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 045f0ef..a84208b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -110,6 +111,13 @@
ReaderBuilder<T> topic(String topicName);
/**
+ * Specify topics this reader will read from.
+ * @param topicNames
+ * @return
+ */
+ ReaderBuilder<T> topics(List<String> topicNames);
+
+ /**
* The initial reader positioning is done by specifying a message id. The options are:
* <ul>
* <li>{@link MessageId#earliest}: Start reading from the earliest message available in the topic</li>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index f2e9f8a..b9593bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -55,13 +55,12 @@
subscription = readerConfiguration.getSubscriptionName();
}
ConsumerConfigurationData<T> consumerConfiguration = new ConsumerConfigurationData<>();
- consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
+ consumerConfiguration.getTopicNames().addAll(readerConfiguration.getTopicNames());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
- consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
if (readerConfiguration.getReaderListener() != null) {
ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0790e75..684a64b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -510,24 +510,18 @@
}
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
- return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
- .thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone));
- }
-
- <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
-
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
-
- String topic = conf.getTopicName();
-
- if (!TopicName.isValid(topic)) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+ for (String topic : conf.getTopicNames()) {
+ if (!TopicName.isValid(topic)) {
+ return FutureUtil.failedFuture(new PulsarClientException
+ .InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+ }
}
if (conf.getStartMessageId() == null) {
@@ -535,6 +529,34 @@
.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
}
+ if (conf.getTopicNames().size() == 1) {
+ return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
+ .thenCompose(schemaClone -> createSingleTopicReaderAsync(conf, schemaClone));
+ }
+ return createMultiTopicReaderAsync(conf, schema);
+ }
+
+ protected <T> CompletableFuture<Reader<T>> createMultiTopicReaderAsync(
+ ReaderConfigurationData<T> conf, Schema<T> schema) {
+ CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
+ CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
+ MultiTopicsReaderImpl<T> reader = new MultiTopicsReaderImpl<>(this,
+ conf, externalExecutorProvider, consumerSubscribedFuture, schema);
+ ConsumerBase<T> consumer = reader.getMultiTopicsConsumer();
+ consumers.add(consumer);
+ consumerSubscribedFuture.thenRun(() -> readerFuture.complete(reader))
+ .exceptionally(ex -> {
+ log.warn("Failed to create multiTopicReader", ex);
+ readerFuture.completeExceptionally(ex);
+ return null;
+ });
+ return readerFuture;
+ }
+
+ protected <T> CompletableFuture<Reader<T>> createSingleTopicReaderAsync(
+ ReaderConfigurationData<T> conf, Schema<T> schema) {
+ String topic = conf.getTopicName();
+
CompletableFuture<Reader<T>> readerFuture = new CompletableFuture<>();
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 36ec7be..b8f017f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -22,9 +22,11 @@
import static org.apache.pulsar.client.api.KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
import com.google.common.base.Preconditions;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
@@ -79,7 +81,7 @@
@Override
public CompletableFuture<Reader<T>> createAsync() {
- if (conf.getTopicName() == null) {
+ if (conf.getTopicNames().isEmpty()) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}
@@ -113,6 +115,17 @@
}
@Override
+ public ReaderBuilder<T> topics(List<String> topicNames) {
+ checkArgument(topicNames != null && topicNames.size() > 0,
+ "Passed in topicNames should not be null or empty.");
+ topicNames.forEach(topicName ->
+ checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
+ conf.getTopicNames().addAll(topicNames.stream().map(StringUtils::trim)
+ .collect(Collectors.toList()));
+ return this;
+ }
+
+ @Override
public ReaderBuilder<T> startMessageId(MessageId startMessageId) {
conf.setStartMessageId(startMessageId);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 2761602..14770ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -20,7 +20,9 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -35,7 +37,7 @@
private static final long serialVersionUID = 1L;
- private String topicName;
+ private Set<String> topicNames = new HashSet<>();
@JsonIgnore
private MessageId startMessageId;
@@ -59,10 +61,27 @@
private transient List<Range> keyHashRanges;
+ @JsonIgnore
+ public String getTopicName() {
+ if (topicNames.size() > 1) {
+ throw new IllegalArgumentException("topicNames needs to be = 1");
+ }
+ return topicNames.iterator().next();
+ }
+
+ @JsonIgnore
+ public void setTopicName(String topicNames) {
+ //Compatible with a single topic
+ this.topicNames.clear();
+ this.topicNames.add(topicNames);
+ }
+
@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {
try {
- return (ReaderConfigurationData<T>) super.clone();
+ ReaderConfigurationData<T> clone = (ReaderConfigurationData<T>) super.clone();
+ clone.setTopicNames(new HashSet<>(clone.getTopicNames()));
+ return clone;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ReaderConfigurationData");
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index efd4e48..39c8544 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableSet;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -85,7 +86,7 @@
String topicName = "test_src";
MessageId messageId = new MessageIdImpl(1, 2, 3);
Map<String, Object> config = new HashMap<>();
- config.put("topicName", topicName);
+ config.put("topicNames", ImmutableSet.of(topicName));
config.put("receiverQueueSize", 2000);
ReaderBuilderImpl<byte[]> builder = (ReaderBuilderImpl<byte[]>) client.newReader()
.startMessageId(messageId)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 3659ce8..692e20c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -19,8 +19,7 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
import org.testng.Assert;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -113,7 +112,7 @@
confData.setReceiverQueueSize(1000000);
confData.setReaderName("unknown-reader");
Map<String, Object> config = new HashMap<>();
- config.put("topicName", "test-topic");
+ config.put("topicNames", ImmutableSet.of("test-topic"));
config.put("receiverQueueSize", 100);
confData = ConfigurationDataUtils.loadData(config, confData, ReaderConfigurationData.class);
assertEquals("test-topic", confData.getTopicName());