blob: 93d547f9a5905e04f18790d28825c7144602510a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.NonPersistentTopics;
import org.apache.pulsar.client.admin.ProxyStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ResourceQuotas;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
public class PulsarAdminToolTest {
@Test
public void brokers() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Brokers mockBrokers = mock(Brokers.class);
doReturn(mockBrokers).when(admin).brokers();
CmdBrokers brokers = new CmdBrokers(() -> admin);
brokers.run(split("list use"));
verify(mockBrokers).getActiveBrokers("use");
brokers.run(split("leader-broker"));
verify(mockBrokers).getLeaderBroker();
brokers.run(split("namespaces use --url http://my-service.url:8080"));
verify(mockBrokers).getOwnedNamespaces("use", "http://my-service.url:8080");
brokers.run(split("get-all-dynamic-config"));
verify(mockBrokers).getAllDynamicConfigurations();
brokers.run(split("list-dynamic-config"));
verify(mockBrokers).getDynamicConfigurationNames();
brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");
brokers.run(split("delete-dynamic-config --config brokerShutdownTimeoutMs"));
verify(mockBrokers).deleteDynamicConfiguration("brokerShutdownTimeoutMs");
brokers.run(split("get-internal-config"));
verify(mockBrokers).getInternalConfigurationData();
brokers.run(split("get-runtime-config"));
verify(mockBrokers).getRuntimeConfigurations();
brokers.run(split("healthcheck"));
verify(mockBrokers).healthcheck();
brokers.run(split("version"));
verify(mockBrokers).getVersion();
}
@Test
public void brokerStats() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
BrokerStats mockBrokerStats = mock(BrokerStats.class);
doReturn(mockBrokerStats).when(admin).brokerStats();
CmdBrokerStats brokerStats = new CmdBrokerStats(() -> admin);
brokerStats.run(split("topics"));
verify(mockBrokerStats).getTopics();
brokerStats.run(split("load-report"));
verify(mockBrokerStats).getLoadReport();
brokerStats.run(split("mbeans"));
verify(mockBrokerStats).getMBeans();
brokerStats.run(split("monitoring-metrics"));
verify(mockBrokerStats).getMetrics();
}
@Test
public void getOwnedNamespaces() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Brokers mockBrokers = mock(Brokers.class);
doReturn(mockBrokers).when(admin).brokers();
CmdBrokers brokers = new CmdBrokers(() -> admin);
brokers.run(split("namespaces use --url http://my-service.url:4000"));
verify(mockBrokers).getOwnedNamespaces("use", "http://my-service.url:4000");
}
@Test
public void clusters() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Clusters mockClusters = mock(Clusters.class);
when(admin.clusters()).thenReturn(mockClusters);
CmdClusters clusters = new CmdClusters(() -> admin);
clusters.run(split("list"));
verify(mockClusters).getClusters();
clusters.run(split("get use"));
verify(mockClusters).getCluster("use");
clusters.run(split("create use --url http://my-service.url:8080"));
verify(mockClusters).createCluster("use", new ClusterData("http://my-service.url:8080", null));
clusters.run(split("update use --url http://my-service.url:8080"));
verify(mockClusters).updateCluster("use", new ClusterData("http://my-service.url:8080", null));
clusters.run(split("delete use"));
verify(mockClusters).deleteCluster("use");
clusters.run(split("list-failure-domains use"));
verify(mockClusters).getFailureDomains("use");
clusters.run(split("get-failure-domain use --domain-name domain"));
verify(mockClusters).getFailureDomain("use", "domain");
clusters.run(split("create-failure-domain use --domain-name domain --broker-list b1"));
FailureDomain domain = new FailureDomain();
domain.setBrokers(Sets.newHashSet("b1"));
verify(mockClusters).createFailureDomain("use", "domain", domain);
clusters.run(split("update-failure-domain use --domain-name domain --broker-list b1"));
verify(mockClusters).updateFailureDomain("use", "domain", domain);
clusters.run(split("delete-failure-domain use --domain-name domain"));
verify(mockClusters).deleteFailureDomain("use", "domain");
// Re-create CmdClusters to avoid a issue.
// See https://github.com/cbeust/jcommander/issues/271
clusters = new CmdClusters(() -> admin);
clusters.run(
split("create my-cluster --url http://my-service.url:8080 --url-secure https://my-service.url:4443"));
verify(mockClusters).createCluster("my-cluster",
new ClusterData("http://my-service.url:8080", "https://my-service.url:4443"));
clusters.run(
split("update my-cluster --url http://my-service.url:8080 --url-secure https://my-service.url:4443"));
verify(mockClusters).updateCluster("my-cluster",
new ClusterData("http://my-service.url:8080", "https://my-service.url:4443"));
clusters.run(split("delete my-cluster"));
verify(mockClusters).deleteCluster("my-cluster");
clusters.run(split("update-peer-clusters my-cluster --peer-clusters c1,c2"));
verify(mockClusters).updatePeerClusterNames("my-cluster", Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2")));
clusters.run(split("get-peer-clusters my-cluster"));
verify(mockClusters).getPeerClusterNames("my-cluster");
// test create cluster without --url
clusters = new CmdClusters(() -> admin);
clusters.run(split("create my-secure-cluster --url-secure https://my-service.url:4443"));
verify(mockClusters).createCluster("my-secure-cluster", new ClusterData(null, "https://my-service.url:4443"));
clusters.run(split("update my-secure-cluster --url-secure https://my-service.url:4443"));
verify(mockClusters).updateCluster("my-secure-cluster", new ClusterData(null, "https://my-service.url:4443"));
clusters.run(split("delete my-secure-cluster"));
verify(mockClusters).deleteCluster("my-secure-cluster");
}
@Test
public void tenants() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Tenants mockTenants = mock(Tenants.class);
when(admin.tenants()).thenReturn(mockTenants);
CmdTenants tenants = new CmdTenants(() -> admin);
tenants.run(split("list"));
verify(mockTenants).getTenants();
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
tenants.run(split("create my-tenant --admin-roles role1,role2 --allowed-clusters use"));
verify(mockTenants).createTenant("my-tenant", tenantInfo);
tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("usw"));
tenants.run(split("update my-tenant --admin-roles role1,role2 --allowed-clusters usw"));
verify(mockTenants).updateTenant("my-tenant", tenantInfo);
tenants.run(split("get my-tenant"));
verify(mockTenants).getTenantInfo("my-tenant");
tenants.run(split("delete my-tenant"));
verify(mockTenants).deleteTenant("my-tenant", false);
}
@Test
public void namespaces() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("list myprop"));
verify(mockNamespaces).getNamespaces("myprop");
namespaces.run(split("list-cluster myprop/clust"));
verify(mockNamespaces).getNamespaces("myprop", "clust");
namespaces.run(split("topics myprop/clust/ns1"));
verify(mockNamespaces).getTopics("myprop/clust/ns1");
namespaces.run(split("policies myprop/clust/ns1"));
verify(mockNamespaces).getPolicies("myprop/clust/ns1");
namespaces.run(split("create myprop/clust/ns1"));
verify(mockNamespaces).createNamespace("myprop/clust/ns1");
namespaces.run(split("delete myprop/clust/ns1"));
verify(mockNamespaces).deleteNamespace("myprop/clust/ns1", false);
namespaces.run(split("permissions myprop/clust/ns1"));
verify(mockNamespaces).getPermissions("myprop/clust/ns1");
namespaces.run(split("grant-permission myprop/clust/ns1 --role role1 --actions produce,consume"));
verify(mockNamespaces).grantPermissionOnNamespace("myprop/clust/ns1", "role1",
EnumSet.of(AuthAction.produce, AuthAction.consume));
namespaces.run(split("revoke-permission myprop/clust/ns1 --role role1"));
verify(mockNamespaces).revokePermissionsOnNamespace("myprop/clust/ns1", "role1");
namespaces.run(split("set-clusters myprop/clust/ns1 -c use,usw,usc"));
verify(mockNamespaces).setNamespaceReplicationClusters("myprop/clust/ns1",
Sets.newHashSet("use", "usw", "usc"));
namespaces.run(split("get-clusters myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1");
namespaces.run(split("set-subscription-types-enabled myprop/clust/ns1 -t Shared,Failover"));
verify(mockNamespaces).setSubscriptionTypesEnabled("myprop/clust/ns1",
Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
namespaces.run(split("get-subscription-types-enabled myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionTypesEnabled("myprop/clust/ns1");
namespaces
.run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2"));
verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1",
new BookieAffinityGroupData("test1", "test2"));
namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");
namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1");
namespaces.run(split("set-replicator-dispatch-rate myprop/clust/ns1 -md 10 -bd 11 -dt 12"));
verify(mockNamespaces).setReplicatorDispatchRate("myprop/clust/ns1", new DispatchRate(10, 11, 12));
namespaces.run(split("get-replicator-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getReplicatorDispatchRate("myprop/clust/ns1");
namespaces.run(split("remove-replicator-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeReplicatorDispatchRate("myprop/clust/ns1");
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff"));
verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff");
namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff"));
verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null);
namespaces.run(split("get-backlog-quotas myprop/clust/ns1"));
verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1");
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_request_hold -l 10"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
new BacklogQuota(10, RetentionPolicy.producer_request_hold));
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10K"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
new BacklogQuota(10 * 1024, RetentionPolicy.producer_exception));
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10M"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
new BacklogQuota(10 * 1024 * 1024, RetentionPolicy.producer_exception));
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
new BacklogQuota(10l * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0"));
verify(mockNamespaces).setPersistence("myprop/clust/ns1", new PersistencePolicies(2, 1, 1, 100.0d));
namespaces.run(split("get-persistence myprop/clust/ns1"));
verify(mockNamespaces).getPersistence("myprop/clust/ns1");
namespaces.run(split("remove-persistence myprop/clust/ns1"));
verify(mockNamespaces).removePersistence("myprop/clust/ns1");
namespaces.run(split("get-max-subscriptions-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1");
namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1 -m 300"));
verify(mockNamespaces).setMaxSubscriptionsPerTopic("myprop/clust/ns1", 300);
namespaces.run(split("remove-max-subscriptions-per-topic myprop/clust/ns1"));
verify(mockNamespaces).removeMaxSubscriptionsPerTopic("myprop/clust/ns1");
namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300"));
verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300);
namespaces.run(split("set-subscription-expiration-time myprop/clust/ns1 -t 60"));
verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60);
namespaces.run(split("get-deduplication myprop/clust/ns1"));
verify(mockNamespaces).getDeduplicationStatus("myprop/clust/ns1");
namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true);
namespaces.run(split("remove-deduplication myprop/clust/ns1"));
verify(mockNamespaces).removeDeduplicationStatus("myprop/clust/ns1");
namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned"));
verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1"));
verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1");
namespaces.run(split("set-auto-subscription-creation myprop/clust/ns1 -e"));
verify(mockNamespaces).setAutoSubscriptionCreation("myprop/clust/ns1",
new AutoSubscriptionCreationOverride(true));
namespaces.run(split("remove-auto-subscription-creation myprop/clust/ns1"));
verify(mockNamespaces).removeAutoSubscriptionCreation("myprop/clust/ns1");
namespaces.run(split("get-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
namespaces.run(split("get-subscription-expiration-time myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1");
namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group"));
verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group");
namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group"));
verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group");
namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
namespaces.run(split("get-retention myprop/clust/ns1"));
verify(mockNamespaces).getRetention("myprop/clust/ns1");
namespaces.run(split("remove-retention myprop/clust/ns1"));
verify(mockNamespaces).removeRetention("myprop/clust/ns1");
namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true));
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
namespaces.run(split("remove-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1");
namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
namespaces.run(split("remove-inactive-topic-policies myprop/clust/ns1"));
verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1");
namespaces.run(split("clear-backlog myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("clear-backlog -b 0x80000000_0xffffffff myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBundleBacklog("myprop/clust/ns1", "0x80000000_0xffffffff");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("clear-backlog -s my-sub myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBacklogForSubscription("myprop/clust/ns1", "my-sub");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("clear-backlog -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBundleBacklogForSubscription("myprop/clust/ns1", "0x80000000_0xffffffff",
"my-sub");
namespaces.run(split("unsubscribe -s my-sub myprop/clust/ns1"));
verify(mockNamespaces).unsubscribeNamespace("myprop/clust/ns1", "my-sub");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("unsubscribe -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1"));
verify(mockNamespaces).unsubscribeNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", "my-sub");
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("get-max-producers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxProducersPerTopic("myprop/clust/ns1");
namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
namespaces.run(split("remove-max-producers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).removeMaxProducersPerTopic("myprop/clust/ns1");
namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2"));
verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2);
namespaces.run(split("remove-max-consumers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).removeMaxConsumersPerTopic("myprop/clust/ns1");
namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1");
namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
namespaces.run(split("get-max-unacked-messages-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).getMaxUnackedMessagesPerSubscription("myprop/clust/ns1");
namespaces.run(split("set-max-unacked-messages-per-subscription myprop/clust/ns1 -c 3"));
verify(mockNamespaces).setMaxUnackedMessagesPerSubscription("myprop/clust/ns1", 3);
namespaces.run(split("remove-max-unacked-messages-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).removeMaxUnackedMessagesPerSubscription("myprop/clust/ns1");
namespaces.run(split("get-max-unacked-messages-per-consumer myprop/clust/ns1"));
verify(mockNamespaces).getMaxUnackedMessagesPerConsumer("myprop/clust/ns1");
namespaces.run(split("set-max-unacked-messages-per-consumer myprop/clust/ns1 -c 3"));
verify(mockNamespaces).setMaxUnackedMessagesPerConsumer("myprop/clust/ns1", 3);
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("set-dispatch-rate myprop/clust/ns1 -md -1 -bd -1 -dt 2"));
verify(mockNamespaces).setDispatchRate("myprop/clust/ns1", new DispatchRate(-1, -1, 2));
namespaces.run(split("get-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getDispatchRate("myprop/clust/ns1");
namespaces.run(split("remove-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeDispatchRate("myprop/clust/ns1");
namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20"));
verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new PublishRate(10, 20));
namespaces.run(split("get-publish-rate myprop/clust/ns1"));
verify(mockNamespaces).getPublishRate("myprop/clust/ns1");
namespaces.run(split("remove-publish-rate myprop/clust/ns1"));
verify(mockNamespaces).removePublishRate("myprop/clust/ns1");
namespaces.run(split("set-subscribe-rate myprop/clust/ns1 -sr 2 -st 60"));
verify(mockNamespaces).setSubscribeRate("myprop/clust/ns1", new SubscribeRate(2, 60));
namespaces.run(split("get-subscribe-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscribeRate("myprop/clust/ns1");
namespaces.run(split("remove-subscribe-rate myprop/clust/ns1"));
verify(mockNamespaces).removeSubscribeRate("myprop/clust/ns1");
namespaces.run(split("set-subscription-dispatch-rate myprop/clust/ns1 -md -1 -bd -1 -dt 2"));
verify(mockNamespaces).setSubscriptionDispatchRate("myprop/clust/ns1", new DispatchRate(-1, -1, 2));
namespaces.run(split("get-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionDispatchRate("myprop/clust/ns1");
namespaces.run(split("remove-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeSubscriptionDispatchRate("myprop/clust/ns1");
namespaces.run(split("get-compaction-threshold myprop/clust/ns1"));
verify(mockNamespaces).getCompactionThreshold("myprop/clust/ns1");
namespaces.run(split("remove-compaction-threshold myprop/clust/ns1"));
verify(mockNamespaces).removeCompactionThreshold("myprop/clust/ns1");
namespaces.run(split("set-compaction-threshold myprop/clust/ns1 -t 1G"));
verify(mockNamespaces).setCompactionThreshold("myprop/clust/ns1", 1024 * 1024 * 1024);
namespaces.run(split("get-offload-threshold myprop/clust/ns1"));
verify(mockNamespaces).getOffloadThreshold("myprop/clust/ns1");
namespaces.run(split("set-offload-threshold myprop/clust/ns1 -s 1G"));
verify(mockNamespaces).setOffloadThreshold("myprop/clust/ns1", 1024 * 1024 * 1024);
namespaces.run(split("get-offload-deletion-lag myprop/clust/ns1"));
verify(mockNamespaces).getOffloadDeleteLagMs("myprop/clust/ns1");
namespaces.run(split("set-offload-deletion-lag myprop/clust/ns1 -l 1d"));
verify(mockNamespaces).setOffloadDeleteLag("myprop/clust/ns1", 24 * 60 * 60, TimeUnit.SECONDS);
namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
namespaces.run(split(
"set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
namespaces.run(split("get-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
namespaces.run(split("remove-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).removeNamespaceMessageTTL("myprop/clust/ns1");
namespaces.run(split("set-deduplication-snapshot-interval myprop/clust/ns1 -i 1000"));
verify(mockNamespaces).setDeduplicationSnapshotInterval("myprop/clust/ns1", 1000);
namespaces.run(split("get-deduplication-snapshot-interval myprop/clust/ns1"));
verify(mockNamespaces).getDeduplicationSnapshotInterval("myprop/clust/ns1");
namespaces.run(split("remove-deduplication-snapshot-interval myprop/clust/ns1"));
verify(mockNamespaces).removeDeduplicationSnapshotInterval("myprop/clust/ns1");
}
@Test
public void namespacesCreateV1() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("create my-prop/my-cluster/my-namespace"));
verify(mockNamespaces).createNamespace("my-prop/my-cluster/my-namespace");
}
@Test
public void namespacesCreateV1WithBundlesAndClusters() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("create my-prop/my-cluster/my-namespace --bundles 5 --clusters a,b,c"));
verify(mockNamespaces).createNamespace("my-prop/my-cluster/my-namespace", 5);
verify(mockNamespaces).setNamespaceReplicationClusters("my-prop/my-cluster/my-namespace", Sets.newHashSet("a", "b", "c"));
}
@Test
public void namespacesCreate() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("create my-prop/my-namespace"));
Policies policies = new Policies();
policies.bundles = null;
verify(mockNamespaces).createNamespace("my-prop/my-namespace", policies);
}
@Test
public void namespacesCreateWithBundlesAndClusters() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("create my-prop/my-namespace --bundles 5 --clusters a,b,c"));
Policies policies = new Policies();
policies.bundles = new BundlesData(5);
policies.replication_clusters = Sets.newHashSet("a", "b", "c");
verify(mockNamespaces).createNamespace("my-prop/my-namespace", policies);
}
@Test
public void resourceQuotas() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
ResourceQuotas mockResourceQuotas = mock(ResourceQuotas.class);
when(admin.resourceQuotas()).thenReturn(mockResourceQuotas);
CmdResourceQuotas cmdResourceQuotas = new CmdResourceQuotas(() -> admin);
ResourceQuota quota = new ResourceQuota();
quota.setMsgRateIn(10);
quota.setMsgRateOut(20);
quota.setBandwidthIn(10000);
quota.setBandwidthOut(20000);
quota.setMemory(100);
quota.setDynamic(false);
cmdResourceQuotas.run(split("get"));
verify(mockResourceQuotas).getDefaultResourceQuota();
cmdResourceQuotas.run(split("set -mi 10 -mo 20 -bi 10000 -bo 20000 -mem 100"));
verify(mockResourceQuotas).setDefaultResourceQuota(quota);
// reset mocks
mockResourceQuotas = mock(ResourceQuotas.class);
when(admin.resourceQuotas()).thenReturn(mockResourceQuotas);
cmdResourceQuotas = new CmdResourceQuotas(() -> admin);
cmdResourceQuotas.run(split("get --namespace myprop/clust/ns1 --bundle 0x80000000_0xffffffff"));
verify(mockResourceQuotas).getNamespaceBundleResourceQuota("myprop/clust/ns1", "0x80000000_0xffffffff");
cmdResourceQuotas.run(split(
"set --namespace myprop/clust/ns1 --bundle 0x80000000_0xffffffff -mi 10 -mo 20 -bi 10000 -bo 20000 -mem 100"));
verify(mockResourceQuotas).setNamespaceBundleResourceQuota("myprop/clust/ns1", "0x80000000_0xffffffff", quota);
cmdResourceQuotas
.run(split("reset-namespace-bundle-quota --namespace myprop/clust/ns1 --bundle 0x80000000_0xffffffff"));
verify(mockResourceQuotas).resetNamespaceBundleResourceQuota("myprop/clust/ns1", "0x80000000_0xffffffff");
}
@Test
public void namespaceIsolationPolicy() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Clusters mockClusters = mock(Clusters.class);
when(admin.clusters()).thenReturn(mockClusters);
CmdNamespaceIsolationPolicy nsIsolationPoliciesCmd = new CmdNamespaceIsolationPolicy(() -> admin);
nsIsolationPoliciesCmd.run(split("brokers use"));
verify(mockClusters).getBrokersWithNamespaceIsolationPolicy("use");
nsIsolationPoliciesCmd.run(split("broker use --broker my-broker"));
verify(mockClusters).getBrokerWithNamespaceIsolationPolicy("use", "my-broker");
}
@Test
public void topics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Topics mockTopics = mock(Topics.class);
when(admin.topics()).thenReturn(mockTopics);
Schemas mockSchemas = mock(Schemas.class);
when(admin.schemas()).thenReturn(mockSchemas);
CmdTopics cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("delete persistent://myprop/clust/ns1/ds1 -d"));
verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
verify(mockSchemas).deleteSchema("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("unload persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("list myprop/clust/ns1"));
verify(mockTopics).getList("myprop/clust/ns1");
cmdTopics.run(split("subscriptions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false);
cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false, false);
cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold"));
verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1"
, new BacklogQuota(10L, BacklogQuota.RetentionPolicy.producer_request_hold));
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("info-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false, false);
cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
cmdTopics.run(split("skip persistent://myprop/clust/ns1/ds1 -s sub1 -n 100"));
verify(mockTopics).skipMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 2 -st 60"));
verify(mockTopics).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(2, 60));
cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));
cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
cmdTopics.run(split("get-partitioned-topic-metadata persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1 -d"));
verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
verify(mockSchemas, times(2)).deleteSchema("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
cmdTopics.run(split("enable-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("disable-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopics).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", new DispatchRate(-1, -1, 2));
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover"));
verify(mockTopics).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
cmdTopics.run(split("get-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12"));
verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
new DispatchRate(10,11,12));
cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-deduplication-enabled persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics, times(2)).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
verify(mockTopics).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
new DelayedDeliveryPolicies(10000, true));
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
cmdTopics.run(split(
"set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
, "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);
cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics, times(2))
.getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics, times(2)).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
verify(mockTopics, times(2)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics, times(2)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics, times(2)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics, times(2)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k"));
verify(mockTopics).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-max-message-size persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxMessageSize("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-message-size persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxMessageSize("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-message-size persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("get-message-by-id persistent://myprop/clust/ns1/ds1 -l 10 -e 2"));
verify(mockTopics).getMessageById("persistent://myprop/clust/ns1/ds1", 10,2);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getRetention("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
verify(mockTopics).setRetention("persistent://myprop/clust/ns1/ds1",
new RetentionPolicies(10, 20));
cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeRetention("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("remove-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeRetention("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopics).setDispatchRate("persistent://myprop/clust/ns1/ds1", new DispatchRate(-1, -1, 2));
cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxProducers("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-producers persistent://myprop/clust/ns1/ds1 -p 99"));
verify(mockTopics).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxConsumers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-consumers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxConsumers("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99"));
verify(mockTopics).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -i 99"));
verify(mockTopics).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 99);
cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
cmdTopics.run(split("get-max-subscriptions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions persistent://myprop/clust/ns1/ds1 -m 100"));
verify(mockTopics).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 100);
cmdTopics.run(split("remove-max-subscriptions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
verify(mockTopics).setPersistence("persistent://myprop/clust/ns1/ds1", new PersistencePolicies(2, 1, 1, 100.0d));
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removePersistence("persistent://myprop/clust/ns1/ds1");
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
@Override
public boolean matches(Long timestamp) {
long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
return true;
}
return false;
}
}
cmdTopics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
longThat(new TimestampMatcher()));
cmdTopics.run(split("last-message-id persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getLastMessageId(eq("persistent://myprop/clust/ns1/ds1"));
cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10"));
verify(mockTopics).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("reset-cursor persistent://myprop/clust/ns1/ds2 -s sub1 -m 1:1 -e"));
verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds2"), eq("sub1")
, eq(new MessageIdImpl(1, 1, -1)), eq(true));
cmdTopics.run(split("get-maxProducers persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMaxProducers("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 --applied"));
verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMaxConsumers("persistent://myprop/clust/ns1/ds1", true);
}
@Test
public void persistentTopics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Topics mockTopics = mock(Topics.class);
when(admin.topics()).thenReturn(mockTopics);
CmdPersistentTopics topics = new CmdPersistentTopics(() -> admin);
topics.run(split("delete persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
topics.run(split("unload persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
topics.run(split("list myprop/clust/ns1"));
verify(mockTopics).getList("myprop/clust/ns1");
topics.run(split("subscriptions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
topics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false);
topics.run(split("stats persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1");
topics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false);
topics.run(split("info-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
topics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true);
topics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
topics.run(split("skip-all persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
topics.run(split("skip persistent://myprop/clust/ns1/ds1 -s sub1 -n 100"));
verify(mockTopics).skipMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
topics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
topics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
topics.run(split("list-partitioned-topics myprop/clust/ns1"));
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
topics.run(split("get-partitioned-topic-metadata persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
topics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
topics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
@Override
public boolean matches(Long timestamp) {
long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
return true;
}
return false;
}
}
topics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
longThat(new TimestampMatcher()));
}
@Test
public void nonPersistentTopics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
NonPersistentTopics mockTopics = mock(NonPersistentTopics.class);
when(admin.nonPersistentTopics()).thenReturn(mockTopics);
CmdNonPersistentTopics topics = new CmdNonPersistentTopics(() -> admin);
topics.run(split("stats non-persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getStats("non-persistent://myprop/clust/ns1/ds1");
topics.run(split("stats-internal non-persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("non-persistent://myprop/clust/ns1/ds1");
topics.run(split("create-partitioned-topic non-persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1", 32);
topics.run(split("list myprop/clust/ns1"));
verify(mockTopics).getList("myprop/clust/ns1");
topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 0x23d70a30_0x26666658"));
verify(mockTopics).getListInBundle("myprop/clust/ns1", "0x23d70a30_0x26666658");
}
@Test
public void bookies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Bookies mockBookies = mock(Bookies.class);
doReturn(mockBookies).when(admin).bookies();
doReturn(new BookiesClusterInfo(Collections.emptyList())).when(mockBookies).getBookies();
doReturn(new BookiesRackConfiguration()).when(mockBookies).getBookiesRackInfo();
CmdBookies bookies = new CmdBookies(() -> admin);
bookies.run(split("racks-placement"));
verify(mockBookies).getBookiesRackInfo();
bookies.run(split("list-bookies"));
verify(mockBookies).getBookies();
bookies.run(split("get-bookie-rack --bookie my-bookie:3181"));
verify(mockBookies).getBookieRackInfo("my-bookie:3181");
bookies.run(split("delete-bookie-rack --bookie my-bookie:3181"));
verify(mockBookies).deleteBookieRackInfo("my-bookie:3181");
bookies.run(split("set-bookie-rack --group my-group --bookie my-bookie:3181 --rack rack-1 --hostname host-1"));
verify(mockBookies).updateBookieRackInfo("my-bookie:3181", "my-group", new BookieInfo("rack-1", "host-1"));
}
@Test
public void requestTimeout() throws Exception {
Properties properties = new Properties();
properties.put("webServiceUrl", "http://localhost:2181");
PulsarAdminTool tool = new PulsarAdminTool(properties);
try {
tool.run("--request-timeout 1".split(" "));
} catch (Exception e) {
//Ok
}
Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
adminBuilderField.setAccessible(true);
PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
Field requestTimeoutField =
PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeout");
requestTimeoutField.setAccessible(true);
int requestTimeout = (int) requestTimeoutField.get(builder);
Field requestTimeoutUnitField =
PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeoutUnit");
requestTimeoutUnitField.setAccessible(true);
TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(builder);
assertEquals(1, requestTimeout);
assertEquals(TimeUnit.SECONDS, requestTimeoutUnit);
}
@Test
public void testAuthTlsWithJsonParam() throws Exception {
Properties properties = new Properties();
properties.put("authPlugin", AuthenticationTls.class.getName());
Map<String, String> paramMap = Maps.newHashMap();
final String certFilePath = "/my-file:role=name.cert";
final String keyFilePath = "/my-file:role=name.key";
paramMap.put("tlsCertFile", certFilePath);
paramMap.put("tlsKeyFile", keyFilePath);
final String paramStr = ObjectMapperFactory.getThreadLocal().writeValueAsString(paramMap);
properties.put("authParams", paramStr);
properties.put("webServiceUrl", "http://localhost:2181");
PulsarAdminTool tool = new PulsarAdminTool(properties);
try {
tool.run("brokers list use".split(" "));
} catch (Exception e) {
// Ok
}
// validate Authentication-tls has been configured
Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
adminBuilderField.setAccessible(true);
PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
Field confField = PulsarAdminBuilderImpl.class.getDeclaredField("conf");
confField.setAccessible(true);
ClientConfigurationData conf = (ClientConfigurationData) confField.get(builder);
AuthenticationTls atuh = (AuthenticationTls) conf.getAuthentication();
assertEquals(atuh.getCertFilePath(), certFilePath);
assertEquals(atuh.getKeyFilePath(), keyFilePath);
properties.put("authParams", String.format("tlsCertFile:%s,tlsKeyFile:%s", certFilePath, keyFilePath));
tool = new PulsarAdminTool(properties);
try {
tool.run("brokers list use".split(" "));
} catch (Exception e) {
// Ok
}
builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
conf = (ClientConfigurationData) confField.get(builder);
atuh = (AuthenticationTls) conf.getAuthentication();
assertNull(atuh.getCertFilePath());
assertNull(atuh.getKeyFilePath());
}
@Test
void proxy() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
ProxyStats mockProxyStats = mock(ProxyStats.class);
doReturn(mockProxyStats).when(admin).proxyStats();
CmdProxyStats proxyStats = new CmdProxyStats(() -> admin);
proxyStats.run(split("connections"));
verify(mockProxyStats).getConnections();
proxyStats.run(split("topics"));
verify(mockProxyStats).getTopics();
}
String[] split(String s) {
return s.split(" ");
}
}