blob: 4a1589250add4bc65ad7a4c38c31b6a60c814c56 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class PulsarClientToolTest extends BrokerTestBase {
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testInitialization() throws InterruptedException, ExecutionException, PulsarAdminException {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
String tenantName = UUID.randomUUID().toString();
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant(tenantName, tenantInfo);
String topicName = String.format("persistent://%s/ns/topic-scale-ns-0/topic", tenantName);
int numberOfMessages = 10;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<Void>();
executor.execute(() -> {
PulsarClientTool pulsarClientToolConsumer;
try {
pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-r", "30", topicName };
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
Awaitility.await()
.ignoreExceptions()
.until(()->admin.topics().getSubscriptions(topicName).size() == 1);
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);
String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName };
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
future.get();
}
@Test(timeOut = 20000)
public void testNonDurableSubscribe() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("non-durable");
int numberOfMessages = 10;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-m", "NonDurable", "-r", "30", topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
// Make sure subscription has been created
retryStrategically((test) -> {
try {
return admin.topics().getSubscriptions(topicName).size() == 1;
} catch (Exception e) {
return false;
}
}, 10, 500);
assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);
String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName};
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
Assert.assertFalse(future.isCompletedExceptionally());
future.get();
Awaitility.await()
.ignoreExceptions()
.atMost(Duration.ofMillis(20000))
.until(()->admin.topics().getSubscriptions(topicName).size() == 0);
}
@Test(timeOut = 60000)
public void testDurableSubscribe() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("durable");
int numberOfMessages = 10;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-m", "Durable", "-r", "30", topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
// Make sure subscription has been created
Awaitility.await()
.atMost(Duration.ofMillis(60000))
.ignoreExceptions()
.until(() -> admin.topics().getSubscriptions(topicName).size() == 1);
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);
String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName};
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
Assert.fail("consumer was unable to receive messages", e);
}
}
@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("encryption");
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"consume", "-s", "sub-name", "-n", Integer.toString(numberOfMessages), "-ekv",
keyUriBase + "private-key.client-rsa.pem", topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
// Make sure subscription has been created
Awaitility.await()
.atMost(Duration.ofMillis(20000))
.ignoreExceptions()
.until(() -> admin.topics().getSubscriptions(topicName).size() == 1);
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);
String[] args = {"produce", "-m", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-ekn",
"my-app-key", "-ekv", keyUriBase + "public-key.client-rsa.pem", topicName};
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
Assert.fail("consumer was unable to decrypt messages", e);
}
}
@Test(timeOut = 20000)
public void testDisableBatching() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("disable-batching");
final int numberOfMessages = 5;
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
Assert.assertEquals(pulsarClientTool1.run(args1), 0);
PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
Assert.assertEquals(pulsarClientTool2.run(args2), 0);
for (int i = 0; i < numberOfMessages * 2; i++) {
Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
if (i < numberOfMessages) {
Assert.assertEquals(new String(msg.getData()), "batched");
Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
} else {
Assert.assertEquals(new String(msg.getData()), "non-batched");
Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
}
}
}
private static String getTopicWithRandomSuffix(String localNameBase) {
return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
}
}