| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.tests.integration.cli; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| import java.util.concurrent.TimeUnit; |
| import java.util.UUID; |
| import lombok.Cleanup; |
| import org.apache.http.client.methods.CloseableHttpResponse; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.impl.client.CloseableHttpClient; |
| import org.apache.http.impl.client.HttpClients; |
| import org.apache.http.util.EntityUtils; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.functions.api.examples.pojo.Tick; |
| import org.apache.pulsar.tests.integration.containers.BrokerContainer; |
| import org.apache.pulsar.tests.integration.docker.ContainerExecException; |
| import org.apache.pulsar.tests.integration.docker.ContainerExecResult; |
| import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; |
| import org.apache.pulsar.tests.integration.topologies.PulsarCluster; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| /** |
| * Test Pulsar CLI. |
| */ |
| public class CLITest extends PulsarTestSuite { |
| |
| @Test |
| public void testDeprecatedCommands() throws Exception { |
| String tenantName = "test-deprecated-commands"; |
| |
| ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help"); |
| assertFalse(result.getStdout().isEmpty()); |
| assertFalse(result.getStdout().contains("Usage: properties ")); |
| result = pulsarCluster.runAdminCommandOnAnyBroker( |
| "properties", "create", tenantName, |
| "--allowed-clusters", pulsarCluster.getClusterName(), |
| "--admin-roles", "admin" |
| ); |
| assertTrue(result.getStderr().contains("deprecated")); |
| |
| result = pulsarCluster.runAdminCommandOnAnyBroker( |
| "properties", "list"); |
| assertTrue(result.getStdout().contains(tenantName)); |
| result = pulsarCluster.runAdminCommandOnAnyBroker( |
| "tenants", "list"); |
| assertTrue(result.getStdout().contains(tenantName)); |
| } |
| |
| @Test |
| public void testGetTopicListCommand() throws Exception { |
| ContainerExecResult result; |
| |
| final String namespaceLocalName = "list-topics-" + randomName(8); |
| result = pulsarCluster.createNamespace(namespaceLocalName); |
| final String namespace = "public/" + namespaceLocalName; |
| assertEquals(0, result.getExitCode()); |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); |
| |
| final String persistentTopicName = TopicName.get( |
| "persistent", |
| NamespaceName.get(namespace), |
| "get_topics_mode_" + UUID.randomUUID()).toString(); |
| |
| final String nonPersistentTopicName = TopicName.get( |
| "non-persistent", |
| NamespaceName.get(namespace), |
| "get_topics_mode_" + UUID.randomUUID()).toString(); |
| |
| Producer<byte[]> producer1 = client.newProducer() |
| .topic(persistentTopicName) |
| .create(); |
| |
| Producer<byte[]> producer2 = client.newProducer() |
| .topic(nonPersistentTopicName) |
| .create(); |
| |
| BrokerContainer container = pulsarCluster.getAnyBroker(); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "list", |
| namespace); |
| |
| assertTrue(result.getStdout().contains(persistentTopicName)); |
| assertTrue(result.getStdout().contains(nonPersistentTopicName)); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "list", |
| "--topic-domain", |
| "persistent", |
| namespace); |
| |
| assertTrue(result.getStdout().contains(persistentTopicName)); |
| assertFalse(result.getStdout().contains(nonPersistentTopicName)); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "list", |
| "--topic-domain", |
| "non_persistent", |
| namespace); |
| |
| assertFalse(result.getStdout().contains(persistentTopicName)); |
| assertTrue(result.getStdout().contains(nonPersistentTopicName)); |
| |
| try { |
| container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "list", |
| "--topic-domain", |
| "none", |
| namespace); |
| fail(); |
| } catch (ContainerExecException ignore) { |
| } |
| |
| producer1.close(); |
| producer2.close(); |
| } |
| |
| @Test |
| public void testCreateSubscriptionCommand() throws Exception { |
| String topic = "testCreateSubscriptionCommmand"; |
| |
| String subscriptionPrefix = "subscription-"; |
| |
| int i = 0; |
| for (BrokerContainer container : pulsarCluster.getBrokers()) { |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "create-subscription", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| result.assertNoOutput(); |
| i++; |
| } |
| } |
| |
| @Test |
| public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception { |
| String topic = "testCreateSubscriptionCommmand"; |
| |
| String subscriptionPrefix = "subscription-"; |
| |
| int i = 0; |
| for (BrokerContainer container : pulsarCluster.getBrokers()) { |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "create-subscription", |
| "-p", |
| "a=b", |
| "-p", |
| "c=d", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| result.assertNoOutput(); |
| |
| ContainerExecResult resultUpdate = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "update-subscription-properties", |
| "-p", |
| "a=e", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| resultUpdate.assertNoOutput(); |
| |
| ContainerExecResult resultGet = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "get-subscription-properties", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| assertEquals( |
| resultGet.getStdout().trim(), "{\"a\":\"e\"}", |
| "unexpected output " + resultGet.getStdout() + " - error " + resultGet.getStderr()); |
| |
| ContainerExecResult resultClear = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "update-subscription-properties", |
| "-c", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| resultClear.assertNoOutput(); |
| |
| ContainerExecResult resultGetAfterClear = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "get-subscription-properties", |
| "persistent://public/default/" + topic, |
| "--subscription", |
| "" + subscriptionPrefix + i |
| ); |
| assertEquals( |
| resultGetAfterClear.getStdout().trim(), "{}", |
| "unexpected output " + resultGetAfterClear.getStdout() |
| + " - error " + resultGetAfterClear.getStderr()); |
| |
| i++; |
| } |
| } |
| |
| @Test |
| public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception { |
| String topicName = "persistent://public/default/test-topic-termination"; |
| BrokerContainer container = pulsarCluster.getAnyBroker(); |
| container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "create", |
| topicName); |
| |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.CLIENT_SCRIPT, |
| "produce", |
| "-m", |
| "\"test topic termination\"", |
| "-n", |
| "1", |
| topicName); |
| |
| assertTrue(result.getStdout().contains("1 messages successfully produced")); |
| |
| // terminate the topic |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "topics", |
| "terminate", |
| topicName); |
| assertTrue(result.getStdout().contains("Topic successfully terminated at")); |
| |
| // try to produce should fail |
| try { |
| pulsarCluster.getAnyBroker().execCmd(PulsarCluster.CLIENT_SCRIPT, |
| "produce", |
| "-m", |
| "\"test topic termination\"", |
| "-n", |
| "1", |
| topicName); |
| fail("Command should have exited with non-zero"); |
| } catch (ContainerExecException e) { |
| assertTrue(e.getResult().getStdout().contains("Topic was already terminated")); |
| } |
| } |
| |
| @Test |
| public void testPropertiesCLI() throws Exception { |
| final BrokerContainer container = pulsarCluster.getAnyBroker(); |
| final String namespace = "public/default"; |
| |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "set-property", |
| "-k", |
| "a", |
| "-v", |
| "a", |
| namespace); |
| assertTrue(result.getStdout().isEmpty()); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "get-property", |
| "-k", |
| "a", |
| namespace); |
| assertTrue(result.getStdout().contains("a")); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "remove-property", |
| "-k", |
| "a", |
| namespace); |
| assertTrue(result.getStdout().contains("a")); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "remove-property", |
| "-k", |
| "a", |
| namespace); |
| assertTrue(result.getStdout().contains("null")); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "set-properties", |
| "-p", |
| "a=a,b=b,c=c", |
| namespace); |
| assertTrue(result.getStdout().isEmpty()); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "get-properties", |
| namespace); |
| assertFalse(result.getStdout().isEmpty()); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "clear-properties", |
| namespace); |
| assertTrue(result.getStdout().isEmpty()); |
| |
| try { |
| container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "bookies", |
| "set-bookie-rack", |
| "-b", "localhost:8082", |
| "-r", ""); |
| fail("Command should have exited with non-zero"); |
| } catch (ContainerExecException e) { |
| assertEquals(e.getResult().getStderr(), "rack name is invalid, it should not be null, empty or '/'\n\n"); |
| } |
| |
| try { |
| container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "namespaces", |
| "set-schema-autoupdate-strategy", |
| namespace); |
| } catch (ContainerExecException e) { |
| assertEquals(e.getResult().getStderr(), "Either --compatibility or --disabled must be specified\n\n"); |
| } |
| } |
| |
| @Test |
| public void testSchemaCLI() throws Exception { |
| BrokerContainer container = pulsarCluster.getAnyBroker(); |
| String topicName = "persistent://public/default/test-schema-cli"; |
| |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.CLIENT_SCRIPT, |
| "produce", |
| "-m", |
| "\"test topic schema\"", |
| "-n", |
| "1", |
| topicName); |
| assertTrue(result.getStdout().contains("1 messages successfully produced")); |
| |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "schemas", |
| "upload", |
| topicName, |
| "-f", |
| "/pulsar/conf/schema_example.conf" |
| ); |
| result.assertNoOutput(); |
| |
| // get schema |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "schemas", |
| "get", |
| topicName); |
| assertTrue(result.getStdout().contains("\"type\": \"STRING\"")); |
| |
| // delete the schema |
| result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "schemas", |
| "delete", |
| topicName); |
| result.assertNoOutput(); |
| |
| // get schema again |
| try { |
| container.execCmd(PulsarCluster.ADMIN_SCRIPT, |
| "schemas", |
| "get", |
| "persistent://public/default/test-schema-cli" |
| ); |
| fail("Command should have exited with non-zero"); |
| } catch (ContainerExecException e) { |
| assertTrue(e.getResult().getStderr().contains("Schema not found")); |
| } |
| |
| try { |
| container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "schemas", |
| "extract", |
| "--jar", "/pulsar/examples/api-examples.jar", |
| "--type", "xml", |
| "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick", |
| topicName); |
| fail("Command should have exited with non-zero"); |
| } catch (ContainerExecException e) { |
| assertEquals(e.getResult().getStderr(), "Invalid schema type xml. Valid options are: avro, json\n\n"); |
| } |
| } |
| |
| @Test |
| public void testSetInfiniteRetention() throws Exception { |
| ContainerExecResult result; |
| |
| String namespace = "get-and-set-retention" + randomName(8); |
| pulsarCluster.createNamespace(namespace); |
| |
| String[] setCommand = { |
| "namespaces", "set-retention", "public/" + namespace, |
| "--size", "-1", |
| "--time", "-1" |
| }; |
| |
| result = pulsarCluster.runAdminCommandOnAnyBroker(setCommand); |
| result.assertNoOutput(); |
| |
| String[] getCommand = { |
| "namespaces", "get-retention", "public/" + namespace |
| }; |
| |
| result = pulsarCluster.runAdminCommandOnAnyBroker(getCommand); |
| assertTrue( |
| result.getStdout().contains("\"retentionTimeInMinutes\" : -1"), |
| result.getStdout()); |
| assertTrue( |
| result.getStdout().contains("\"retentionSizeInMB\" : -1"), |
| result.getStdout()); |
| } |
| |
| // authorization related tests |
| |
| @Test |
| public void testGrantPermissionsAuthorizationDisabled() throws Exception { |
| ContainerExecResult result; |
| |
| String namespace = "grant-permissions-" + randomName(8); |
| result = pulsarCluster.createNamespace(namespace); |
| assertEquals(0, result.getExitCode()); |
| |
| String[] grantCommand = { |
| "namespaces", "grant-permission", "public/" + namespace, |
| "--actions", "produce", |
| "--role", "test-role" |
| }; |
| try { |
| pulsarCluster.runAdminCommandOnAnyBroker(grantCommand); |
| } catch (ContainerExecException cee) { |
| result = cee.getResult(); |
| assertTrue(result.getStderr().contains("HTTP 501 Not Implemented"), result.getStderr()); |
| } |
| } |
| |
| @Test |
| public void testJarPojoSchemaUploadAvro() throws Exception { |
| |
| ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker( |
| "schemas", |
| "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "avro", |
| "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick", |
| "persistent://public/default/pojo-avro"); |
| |
| Assert.assertEquals(containerExecResult.getExitCode(), 0); |
| testPublishAndConsume("persistent://public/default/pojo-avro", "avro", Schema.AVRO(Tick.class)); |
| } |
| |
| @Test |
| public void testJarPojoSchemaUploadJson() throws Exception { |
| |
| ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker( |
| "schemas", |
| "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "json", |
| "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick", |
| "persistent://public/default/pojo-json"); |
| |
| Assert.assertEquals(containerExecResult.getExitCode(), 0); |
| testPublishAndConsume("persistent://public/default/pojo-json", "json", Schema.JSON(Tick.class)); |
| } |
| |
| private void testPublishAndConsume(String topic, String sub, Schema<Tick> type) throws PulsarClientException { |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); |
| |
| @Cleanup |
| Producer<Tick> producer = client.newProducer(type) |
| .topic(topic + "-message") |
| .create(); |
| |
| @Cleanup |
| Consumer<Tick> consumer = client.newConsumer(type) |
| .topic(topic + "-message") |
| .subscriptionName(sub) |
| .subscribe(); |
| |
| final int numOfMessages = 10; |
| |
| for (int i = 1; i < numOfMessages; ++i) { |
| producer.send(new Tick(i, "Stock_" + i, 100 + i, 110 + i)); |
| } |
| |
| for (int i = 1; i < numOfMessages; ++i) { |
| Tick expected = new Tick(i, "Stock_" + i, 100 + i, 110 + i); |
| Message<Tick> receive = consumer.receive(5, TimeUnit.SECONDS); |
| Assert.assertEquals(receive.getValue(), expected); |
| } |
| } |
| |
| @Test |
| public void testListNonPersistentTopicsCmd() throws Exception { |
| String persistentTopic = "test-list-non-persistent-topic"; |
| ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", persistentTopic); |
| assertEquals(result.getExitCode(), 0); |
| HttpGet get = new HttpGet(pulsarCluster.getHttpServiceUrl() + "/admin/v2/non-persistent/public/default"); |
| try (CloseableHttpClient client = HttpClients.createDefault(); |
| CloseableHttpResponse response = client.execute(get)) { |
| assertFalse(EntityUtils.toString(response.getEntity()).contains(persistentTopic)); |
| } |
| } |
| |
| @Test |
| public void testGenerateDocForModule() throws Exception { |
| String[] moduleNames = { |
| "clusters", |
| "tenants", |
| "brokers", |
| "broker-stats", |
| "namespaces", |
| "topics", |
| "schemas", |
| "bookies", |
| "functions", |
| "ns-isolation-policy", |
| "resource-quotas", |
| "functions", |
| "sources", |
| "sinks" |
| }; |
| BrokerContainer container = pulsarCluster.getAnyBroker(); |
| for (int i = 0; i < moduleNames.length; i++) { |
| ContainerExecResult result = container.execCmd( |
| PulsarCluster.ADMIN_SCRIPT, |
| "documents", "generate", moduleNames[i]); |
| Assert.assertTrue(result.getStdout().contains("# " + moduleNames[i])); |
| } |
| } |
| |
| } |