Revert "[Issue 8340] [pulsar-testclient] Fix to support to specify topics and subscriptions (#9716)"
This reverts commit f4a3f4a0126cfd863772fa2fed57c1ef0113563c.
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 1f915c1..2b436c8 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -22,9 +22,10 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.FileInputStream;
+import java.nio.file.Paths;
import java.text.DecimalFormat;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -83,13 +84,9 @@
@Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
public int numSubscriptions = 1;
- @Deprecated
- @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
+ @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
public String subscriberName = "sub";
- @Parameter(names = { "-ss", "--subscriptions" }, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
- public List<String> subscriptions = Collections.singletonList("sub");
-
@Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
@@ -108,7 +105,7 @@
@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;
- @Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis")
+ @Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis")
public int acknowledgmentsGroupingDelayMillis = 100;
@Parameter(names = { "-c",
@@ -188,8 +185,8 @@
System.exit(-1);
}
- if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
- System.out.println("The size of topics list should be equal to --num-topics");
+ if (arguments.topic.size() != 1) {
+ System.out.println("Only one topic name is allowed");
jc.usage();
System.exit(-1);
}
@@ -200,14 +197,6 @@
System.exit(-1);
}
- if (arguments.subscriptionType != SubscriptionType.Exclusive &&
- arguments.subscriptions != null &&
- arguments.subscriptions.size() != arguments.numConsumers) {
- System.out.println("The size of subscriptions list should be equal to --num-consumers when subscriptionType isn't Exclusive");
- jc.usage();
- System.exit(-1);
- }
-
if (arguments.confFile != null) {
Properties prop = new Properties(System.getProperties());
prop.load(new FileInputStream(arguments.confFile));
@@ -249,6 +238,8 @@
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments));
+ final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));
+
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
@@ -323,12 +314,18 @@
}
for (int i = 0; i < arguments.numTopics; i++) {
- final TopicName topicName = TopicName.get(arguments.topic.get(i));
-
+ final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
+ : TopicName.get(String.format("%s-%d", prefixTopicName, i));
log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName);
for (int j = 0; j < arguments.numSubscriptions; j++) {
- String subscriberName = arguments.subscriptions.get(j);
+ String subscriberName;
+ if (arguments.numSubscriptions > 1) {
+ subscriberName = String.format("%s-%d", arguments.subscriberName, j);
+ } else {
+ subscriberName = arguments.subscriberName;
+ }
+
for (int k = 0; k < arguments.numConsumers; k++) {
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
.subscribeAsync());
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 13ce5a0..49a2284 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -42,6 +42,7 @@
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@@ -233,8 +234,8 @@
System.exit(-1);
}
- if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) {
- System.out.println("The size of topics list should be equal to --num-topic");
+ if (arguments.topics.size() != 1) {
+ System.out.println("Only one topic name is allowed");
jc.usage();
System.exit(-1);
}
@@ -396,6 +397,7 @@
PulsarClient client = null;
try {
// Now processing command line arguments
+ String prefixTopicName = arguments.topics.get(0);
List<Future<Producer<byte[]>>> futures = Lists.newArrayList();
ClientBuilder clientBuilder = PulsarClient.builder() //
@@ -448,7 +450,7 @@
}
for (int i = 0; i < arguments.numTopics; i++) {
- String topic = arguments.topics.get(i);
+ String topic = (arguments.numTopics == 1) ? prefixTopicName : String.format("%s-%d", prefixTopicName, i);
log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);
for (int j = 0; j < arguments.numProducers; j++) {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index dc7041e..ab046f8 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -141,8 +141,8 @@
System.exit(-1);
}
- if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
- System.out.println("The size of topics list should be equal to --num-topics");
+ if (arguments.topic.size() != 1) {
+ System.out.println("Only one topic name is allowed");
jc.usage();
System.exit(-1);
}
@@ -192,6 +192,8 @@
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance reader with config: {}", w.writeValueAsString(arguments));
+ final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));
+
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
@@ -250,7 +252,8 @@
.startMessageId(startMessageId);
for (int i = 0; i < arguments.numTopics; i++) {
- final TopicName topicName = TopicName.get(arguments.topic.get(i));
+ final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
+ : TopicName.get(String.format("%s-%d", prefixTopicName, i));
futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
}
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 09a1d8e..0d7a6f9 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -422,19 +422,19 @@
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`--listener-name`|Listener name for the broker||
-|`--acks-delay-millis`|Acknowledgements grouping delay in millis|100|
+|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
|`-k`, `--encryption-key-name`|The private key name to decrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
|`-h`, `--help`|Help message|false|
|`--conf-file`|Configuration file||
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
-|`-t`, `--num-topics`|The number of topics|1|
+|`-t`, `--num-topic`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message consumer (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-u`, `--service-url`|Pulsar service URL||
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
-|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
+|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
@@ -497,7 +497,7 @@
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
-|`-t`, `--num-topics`|The number of topics|1|
+|`-t`, `--num-topic`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-u`, `--service-url`|Pulsar service URL||