blob: 7cf84673be86aff174677283c7bda8b762e9c10f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilter2Test;
import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.Topics.QueryParam;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-admin")
public class AdminApi2Test extends MockedPulsarServiceBaseTest {
private MockedPulsarService mockPulsarSetup;
private boolean restartClusterAfterTest;
private int usageCount;
private String defaultNamespace;
private String defaultTenant;
@BeforeClass
@Override
public void setup() throws Exception {
super.internalSetup();
// create otherbroker to test redirect on calls that need
// namespace ownership
mockPulsarSetup = new MockedPulsarService(this.conf);
mockPulsarSetup.setup();
setupClusters();
}
@Test
public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws Exception {
super.internalCleanup();
conf.setMaxTopicsPerNamespace(3);
super.internalSetup();
String topic = "persistent://testTenant/ns1/test_create_topic_v";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
Sets.newHashSet("test"));
// check producer/consumer auto create non-partitioned topic
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
pulsarClient.newProducer().topic(topic + "1").create().close();
pulsarClient.newProducer().topic(topic + "2").create().close();
pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
try {
pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub")
.subscribeAsync().get(5, TimeUnit.SECONDS);
Assert.fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
}
// reset configuration
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(1);
}
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
configureDefaults(conf);
return conf;
}
void configureDefaults(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setLoadBalancerEnabled(true);
conf.setEnableNamespaceIsolationUpdateOnTime(true);
conf.setAllowOverrideEntryFilters(true);
conf.setEntryFilterNames(List.of());
conf.setMaxNumPartitionsPerPartitionedTopic(0);
}
@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
if (mockPulsarSetup != null) {
mockPulsarSetup.cleanup();
mockPulsarSetup = null;
}
resetConfig();
}
@AfterMethod(alwaysRun = true)
public void resetClusters() throws Exception {
if (restartClusterAfterTest) {
restartClusterAndResetUsageCount();
} else {
try {
cleanupCluster();
} catch (Exception e) {
log.error("Failed to clean up state by deleting namespaces and tenants after test. "
+ "Restarting the test broker.", e);
restartClusterAndResetUsageCount();
}
}
}
private void cleanupCluster() throws Exception {
pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
for (String tenant : admin.tenants().getTenants()) {
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
deleteNamespaceWithRetry(namespace, true, admin, pulsar,
mockPulsarSetup.getPulsar());
}
try {
admin.tenants().deleteTenant(tenant, true);
} catch (Exception e) {
log.error("Failed to delete tenant {} after test", tenant, e);
String zkDirectory = "/managed-ledgers/" + tenant;
try {
log.info("Listing {} to see if existing keys are preventing deletion.", zkDirectory);
pulsar.getPulsarResources().getLocalMetadataStore().get().getChildren(zkDirectory)
.get(5, TimeUnit.SECONDS).forEach(key -> log.info("Child key '{}'", key));
} catch (Exception ignore) {
log.error("Failed to list tenant {} ZK directory {} after test", tenant, zkDirectory, e);
}
throw e;
}
}
for (String cluster : admin.clusters().getClusters()) {
admin.clusters().deleteCluster(cluster);
}
configureDefaults(conf);
setupClusters();
}
private void restartClusterAfterTest() {
restartClusterAfterTest = true;
}
private void restartClusterAndResetUsageCount() throws Exception {
cleanup();
restartClusterAfterTest = false;
usageCount = 0;
setup();
}
private void restartClusterIfReused() throws Exception {
if (usageCount > 1) {
restartClusterAndResetUsageCount();
}
}
@BeforeMethod
public void increaseUsageCount() {
usageCount++;
}
private void setupClusters() throws PulsarAdminException {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
defaultTenant = newUniqueName("prop-xyz");
admin.tenants().createTenant(defaultTenant, tenantInfo);
defaultNamespace = defaultTenant + "/ns1";
admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
}
@DataProvider(name = "topicType")
public Object[][] topicTypeProvider() {
return new Object[][] { { TopicDomain.persistent.value() }, { TopicDomain.non_persistent.value() } };
}
@DataProvider(name = "namespaceNames")
public Object[][] namespaceNameProvider() {
return new Object[][] { { "ns1" }, { "global" } };
}
@DataProvider(name = "isV1")
public Object[][] isV1() {
return new Object[][] { { true }, { false } };
}
/**
* It verifies http error code when updating partitions to ensure compatibility.
*/
@Test
public void testUpdatePartitionsErrorCode() {
final String nonPartitionedTopicName = "non-partitioned-topic-name" + UUID.randomUUID();
try {
// Update a non-partitioned topic
admin.topics().updatePartitionedTopic(nonPartitionedTopicName, 2);
Assert.fail("Expect conflict exception.");
} catch (PulsarAdminException ex) {
Assert.assertEquals(ex.getStatusCode(), 409 /*Conflict*/);
Assert.assertTrue(ex instanceof PulsarAdminException.ConflictException);
}
}
/**
* <pre>
* It verifies increasing partitions for partitioned-topic.
* 1. create a partitioned-topic
* 2. update partitions with larger number of partitions
* 3. verify: getPartitionedMetadata and check number of partitions
* 4. verify: this api creates existing subscription to new partitioned-topics
* so, message will not be lost in new partitions
* a. start producer and produce messages
* b. check existing subscription for new topics and it should have backlog msgs
*
* </pre>
*
* @throws Exception
*/
@Test
public void testIncrementPartitionsOfTopic() throws Exception {
final String topicName = "increment-partitionedTopic";
final String subName1 = topicName + "-my-sub-1/encode";
final String subName2 = topicName + "-my-sub-2/encode";
final int startPartitions = 4;
final int newPartitions = 8;
final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + topicName;
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
// validate partition topic is created
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
startPartitions);
// create consumer and subscriptions : check subscriptions
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList(subName1));
Consumer<byte[]> consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2)
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(new HashSet<>(admin.topics().getSubscriptions(partitionedTopicName)),
Set.of(subName1, subName2));
// (1) update partitions
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions);
// verify new partitions have been created
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
newPartitions);
// (2) No Msg loss: verify new partitions have the same existing subscription names
final String newPartitionTopicName = TopicName.get(partitionedTopicName).getPartition(startPartitions + 1)
.toString();
// (3) produce messages to all partitions including newly created partitions (RoundRobin)
Producer<byte[]> producer = client.newProducer()
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
final int totalMessages = newPartitions * 2;
for (int i = 0; i < totalMessages; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
// (4) verify existing subscription has not lost any message: create new consumer with sub-2: it will load all
// newly created partition topics
consumer2.close();
consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2)
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(new HashSet<>(admin.topics().getSubscriptions(newPartitionTopicName)),
Set.of(subName1, subName2));
assertEquals(new HashSet<>(admin.topics().getList(defaultNamespace)).size(), newPartitions);
// test cumulative stats for partitioned topic
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
assertEquals(topicStats.getSubscriptions().keySet(), new TreeSet<>(Lists.newArrayList(subName1, subName2)));
assertEquals(topicStats.getSubscriptions().get(subName2).getConsumers().size(), 1);
assertEquals(topicStats.getSubscriptions().get(subName2).getMsgBacklog(), totalMessages);
assertEquals(topicStats.getPublishers().size(), 1);
assertEquals(topicStats.getPartitions(), new HashMap<>());
// (5) verify: each partition should have backlog
topicStats = admin.topics().getPartitionedStats(partitionedTopicName, true);
assertEquals(topicStats.getMetadata().partitions, newPartitions);
Set<String> partitionSet = new HashSet<>();
for (int i = 0; i < newPartitions; i++) {
partitionSet.add(partitionedTopicName + "-partition-" + i);
}
assertEquals(topicStats.getPartitions().keySet(), partitionSet);
for (int i = 0; i < newPartitions; i++) {
TopicStats partitionStats = topicStats.getPartitions()
.get(TopicName.get(partitionedTopicName).getPartition(i).toString());
assertEquals(partitionStats.getPublishers().size(), 1);
assertEquals(partitionStats.getSubscriptions().get(subName2).getConsumers().size(), 1);
assertEquals(partitionStats.getSubscriptions().get(subName2).getMsgBacklog(), 2, 1);
}
producer.close();
consumer1.close();
consumer2.close();
consumer2.close();
}
@Test
public void testTopicPoliciesWithMultiBroker() throws Exception {
restartClusterAfterTest();
//setup cluster with 3 broker
admin.clusters().updateCluster("test",
ClusterData.builder().serviceUrl((pulsar.getWebServiceAddress() + ",localhost:1026," + "localhost:2050")).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
String tenantName = newUniqueName("prop-xyz2");
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
ServiceConfiguration config2 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
ServiceConfiguration config3 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
PulsarService pulsar3 = pulsarTestContext.getPulsarService();
@Cleanup
PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
@Cleanup
PulsarAdmin admin3 = PulsarAdmin.builder().serviceHttpUrl(pulsar3.getWebServiceAddress()).build();
//for partitioned topic, we can get topic policies from every broker
final String topic = "persistent://" + tenantName + "/ns1/" + newUniqueName("test");
int partitionNum = 3;
admin.topics().createPartitionedTopic(topic, partitionNum);
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
setTopicPoliciesAndValidate(admin2, admin3, topic);
//for non-partitioned topic, we can get topic policies from every broker
final String topic2 = "persistent://" + tenantName + "/ns1/" + newUniqueName("test");
pulsarClient.newConsumer().topic(topic2).subscriptionName("sub").subscribe().close();
setTopicPoliciesAndValidate(admin2, admin3, topic2);
}
private void setTopicPoliciesAndValidate(PulsarAdmin admin2
, PulsarAdmin admin3, String topic) throws Exception {
admin.topics().setMaxUnackedMessagesOnConsumer(topic, 100);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
admin.topics().setMaxConsumers(topic, 101);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic)));
admin.topics().setMaxProducers(topic, 102);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxProducers(topic)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
assertEquals(admin2.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
assertEquals(admin3.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100);
assertEquals(admin.topics().getMaxConsumers(topic).intValue(), 101);
assertEquals(admin2.topics().getMaxConsumers(topic).intValue(), 101);
assertEquals(admin3.topics().getMaxConsumers(topic).intValue(), 101);
assertEquals(admin.topics().getMaxProducers(topic).intValue(), 102);
assertEquals(admin2.topics().getMaxProducers(topic).intValue(), 102);
assertEquals(admin3.topics().getMaxProducers(topic).intValue(), 102);
}
/**
* verifies admin api command for non-persistent topic. It verifies: partitioned-topic, stats
*
* @throws Exception
*/
@Test
public void nonPersistentTopics() throws Exception {
final String topicName = "nonPersistentTopic";
final String nonPersistentTopicName = "non-persistent://" + defaultNamespace + "/" + topicName;
// Force to create a topic
publishMessagesOnTopic(nonPersistentTopicName, 0, 0);
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(nonPersistentTopicName).subscriptionName("my-sub")
.subscribe();
publishMessagesOnTopic(nonPersistentTopicName, 10, 0);
NonPersistentTopicStats topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
assertEquals(topicStats.getSubscriptions().keySet(), Set.of("my-sub"));
assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getMsgDropRate(), 0);
assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId());
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(nonPersistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Set.of("my-sub"));
consumer.close();
topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
assertFalse(topicStats.getSubscriptions().containsKey("my-sub"));
assertEquals(topicStats.getPublishers().size(), 0);
// test partitioned-topic
final String partitionedTopicName = "non-persistent://" + defaultNamespace + "/paritioned";
admin.topics().createPartitionedTopic(partitionedTopicName, 5);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
}
private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
producer.close();
}
/**
* verifies validation on persistent-policies
*
* @throws Exception
*/
@Test
public void testSetPersistencePolicies() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
admin.namespaces().createNamespace(namespace, Set.of("test"));
assertNull(admin.namespaces().getPersistence(namespace));
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 10.0));
assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0));
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
}
// make sure policies has not been changed
assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0));
}
/**
* validates update of persistent-policies reflects on managed-ledger and managed-cursor
*
* @throws Exception
*/
@Test
public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = "persistent://" + namespace + "/topic1";
admin.namespaces().createNamespace(namespace, Set.of("test"));
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0));
assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0));
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.getCursors().iterator().next();
final double newThrottleRate = 100;
final int newEnsembleSize = 5;
admin.namespaces().setPersistence(namespace, new PersistencePolicies(newEnsembleSize, 3, 3, newThrottleRate));
retryStrategically((test) -> managedLedger.getConfig().getEnsembleSize() == newEnsembleSize
&& cursor.getThrottleMarkDelete() != newThrottleRate, 5, 200);
// (1) verify cursor.markDelete has been updated
assertEquals(cursor.getThrottleMarkDelete(), newThrottleRate);
// (2) verify new ledger creation takes new config
producer.close();
consumer.close();
}
/**
* Verify unloading topic
*
* @throws Exception
*/
@Test(dataProvider = "topicType")
public void testUnloadTopic(final String topicType) throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = topicType + "://" + namespace + "/topic1";
admin.namespaces().createNamespace(namespace, Set.of("test"));
// create a topic by creating a producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).join().get();
final boolean isPersistentTopic = topic instanceof PersistentTopic;
// (1) unload the topic
unloadTopic(topicName);
// topic must be removed from map
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// recreation of producer will load the topic again
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName).create();
topic = pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topic);
// unload the topic
unloadTopic(topicName);
// producer will retry and recreate the topic
Awaitility.await().until(() -> pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// topic should be loaded by this time
topic = pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topic);
}
private void unloadTopic(String topicName) throws Exception {
admin.topics().unload(topicName);
}
/**
* Verifies reset-cursor at specific position using admin-api.
*
* <pre>
* 1. Publish 50 messages
* 2. Consume 20 messages
* 3. reset cursor position on 10th message
* 4. consume 40 messages from reset position
* </pre>
*
* @param namespaceName
* @throws Exception
*/
@Test(dataProvider = "namespaceNames", timeOut = 30000)
public void testResetCursorOnPosition(String namespaceName) throws Exception {
restartClusterAfterTest();
final String topicName = "persistent://" + defaultTenant + "/use/" + namespaceName + "/resetPosition";
final int totalProducedMessages = 50;
// set retention
admin.namespaces().setRetention(defaultNamespace, new RetentionPolicies(10, 10));
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));
publishMessagesOnPersistentTopic(topicName, totalProducedMessages, 0);
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
Message<byte[]> message = null;
MessageIdImpl resetMessageId = null;
int resetPositionId = 10;
for (int i = 0; i < 20; i++) {
message = consumer.receive(1, TimeUnit.SECONDS);
consumer.acknowledge(message);
if (i == resetPositionId) {
resetMessageId = (MessageIdImpl) message.getMessageId();
}
}
// close consumer which will clean up internal-receive-queue
consumer.close();
// messages should still be available due to retention
MessageIdImpl messageId = new MessageIdImpl(
resetMessageId.getLedgerId(),
resetMessageId.getEntryId(),
-1);
// reset position at resetMessageId
admin.topics().resetCursor(topicName, "my-sub", messageId);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared).subscribe();
MessageIdImpl msgId2 = (MessageIdImpl) consumer.receive(1, TimeUnit.SECONDS).getMessageId();
assertEquals(resetMessageId, msgId2);
int receivedAfterReset = 1; // start with 1 because we have already received 1 msg
for (int i = 0; i < totalProducedMessages; i++) {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
++receivedAfterReset;
}
assertEquals(receivedAfterReset, totalProducedMessages - resetPositionId);
// invalid topic name
try {
admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId);
fail("It should have failed due to invalid topic name");
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains(topicName));
// Ok
}
// invalid cursor name
try {
admin.topics().resetCursor(topicName, "invalid-sub", messageId);
fail("It should have failed due to invalid subscription name");
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("invalid-sub"));
// Ok
}
// invalid position
try {
messageId = new MessageIdImpl(0, 0, -1);
admin.topics().resetCursor(topicName, "my-sub", messageId);
} catch (PulsarAdminException.PreconditionFailedException e) {
fail("It shouldn't fail for a invalid position");
}
consumer.close();
}
@Test
public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + newUniqueName("parttopic");
admin.topics().createPartitionedTopic(partitionedTopicName, 4);
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared).subscribe();
try {
admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
fail();
} catch (PulsarAdminException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
"Condition doesn't match. Actual message:" + e.getMessage());
}
}
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
producer.close();
}
@Test(timeOut = 20000)
public void testMaxConsumersOnSubApi() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(namespace, Set.of("test"));
assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
Awaitility.await().untilAsserted(() -> {
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10);
});
admin.namespaces().removeMaxConsumersPerSubscription(namespace);
Awaitility.await().untilAsserted(() ->
admin.namespaces().getMaxConsumersPerSubscription(namespace));
}
/**
* It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
*
* @throws Exception
*/
@Test
public void testLoadReportApi() throws Exception {
restartClusterAfterTest();
this.conf.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
@Cleanup("cleanup")
MockedPulsarService mockPulsarSetup1 = new MockedPulsarService(this.conf);
mockPulsarSetup1.setup();
PulsarAdmin simpleLoadManagerAdmin = mockPulsarSetup1.getAdmin();
assertNotNull(simpleLoadManagerAdmin.brokerStats().getLoadReport());
this.conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
@Cleanup("cleanup")
MockedPulsarService mockPulsarSetup2 = new MockedPulsarService(this.conf);
mockPulsarSetup2.setup();
PulsarAdmin modularLoadManagerAdmin = mockPulsarSetup2.getAdmin();
assertNotNull(modularLoadManagerAdmin.brokerStats().getLoadReport());
}
@Test
public void testPeerCluster() throws Exception {
admin.clusters().createCluster("us-west1",
ClusterData.builder().serviceUrl("http://broker.messaging.west1.example.com:8080").build());
admin.clusters().createCluster("us-west2",
ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
admin.clusters().createCluster("us-east1",
ClusterData.builder().serviceUrl("http://broker.messaging.east1.example.com:8080").build());
admin.clusters().createCluster("us-east2",
ClusterData.builder().serviceUrl("http://broker.messaging.east2.example.com:8080").build());
admin.clusters().updatePeerClusterNames("us-west1", new LinkedHashSet<>(List.of("us-west2")));
assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(), Set.of("us-west2"));
assertNull(admin.clusters().getCluster("us-west2").getPeerClusterNames());
// update cluster with duplicate peer-clusters in the list
admin.clusters().updatePeerClusterNames("us-west1",
new LinkedHashSet<>(List.of("us-west2", "us-east1", "us-west2", "us-east1", "us-west2", "us-east1")));
assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(),
List.of("us-west2", "us-east1"));
admin.clusters().updatePeerClusterNames("us-west1", null);
assertNull(admin.clusters().getCluster("us-west1").getPeerClusterNames());
// Check name validation
try {
admin.clusters().updatePeerClusterNames("us-west1",
new LinkedHashSet<>(List.of("invalid-cluster")));
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
// Cluster itself can't be part of peer-list
try {
admin.clusters().updatePeerClusterNames("us-west1", new LinkedHashSet<>(List.of("us-west1")));
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
}
/**
* It validates that peer-cluster can't coexist in replication-cluster list
*
* @throws Exception
*/
@Test
public void testReplicationPeerCluster() throws Exception {
restartClusterAfterTest();
admin.clusters().createCluster("us-west1",
ClusterData.builder().serviceUrl("http://broker.messaging.west1.example.com:8080").build());
admin.clusters().createCluster("us-west2",
ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
admin.clusters().createCluster("us-west3",
ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
admin.clusters().createCluster("us-west4",
ClusterData.builder().serviceUrl("http://broker.messaging.west2.example.com:8080").build());
admin.clusters().createCluster("us-east1",
ClusterData.builder().serviceUrl("http://broker.messaging.east1.example.com:8080").build());
admin.clusters().createCluster("us-east2",
ClusterData.builder().serviceUrl("http://broker.messaging.east2.example.com:8080").build());
admin.clusters().createCluster("global", ClusterData.builder().build());
List<String> allClusters = admin.clusters().getClusters();
Collections.sort(allClusters);
assertEquals(allClusters,
List.of("test", "us-east1", "us-east2", "us-west1", "us-west2", "us-west3", "us-west4"));
final String property = newUniqueName("peer-prop");
Set<String> allowedClusters = Set.of("us-west1", "us-west2", "us-west3", "us-west4", "us-east1",
"us-east2", "global");
TenantInfoImpl propConfig = new TenantInfoImpl(Set.of("test"), allowedClusters);
admin.tenants().createTenant(property, propConfig);
final String namespace = property + "/global/conflictPeer";
admin.namespaces().createNamespace(namespace);
admin.clusters().updatePeerClusterNames("us-west1",
new LinkedHashSet<>(List.of("us-west2", "us-west3")));
assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(),
List.of("us-west2", "us-west3"));
// (1) no conflicting peer
Set<String> clusterIds = Set.of("us-east1", "us-east2");
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
// (2) conflicting peer
clusterIds = Set.of("us-west2", "us-west3", "us-west1");
try {
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
fail("Peer-cluster can't coexist in replication cluster list");
} catch (PulsarAdminException.ConflictException e) {
// Ok
}
clusterIds = Set.of("us-west2", "us-west3");
// no peer coexist in replication clusters
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
clusterIds = Set.of("us-west1", "us-west4");
// no peer coexist in replication clusters
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
}
@Test
public void clusterFailureDomain() throws PulsarAdminException {
final String cluster = pulsar.getConfiguration().getClusterName();
// create
FailureDomain domain = FailureDomain.builder()
.brokers(Set.of("b1", "b2", "b3"))
.build();
admin.clusters().createFailureDomain(cluster, "domain-1", domain);
admin.clusters().updateFailureDomain(cluster, "domain-1", domain);
assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain);
Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster);
assertEquals(domains.size(), 1);
assertTrue(domains.containsKey("domain-1"));
try {
// try to create domain with already registered brokers
admin.clusters().createFailureDomain(cluster, "domain-2", domain);
fail("should have failed because of brokers are already registered");
} catch (PulsarAdminException.ConflictException e) {
// Ok
}
admin.clusters().deleteFailureDomain(cluster, "domain-1");
assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty());
admin.clusters().createFailureDomain(cluster, "domain-2", domain);
domains = admin.clusters().getFailureDomains(cluster);
assertEquals(domains.size(), 1);
assertTrue(domains.containsKey("domain-2"));
}
@Test
public void namespaceAntiAffinity() throws PulsarAdminException {
final String namespace = defaultNamespace;
final String antiAffinityGroup = "group";
assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup);
admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
final String ns1 = defaultTenant + "/antiAG1";
final String ns2 = defaultTenant + "/antiAG2";
final String ns3 = defaultTenant + "/antiAG3";
admin.namespaces().createNamespace(ns1, Set.of("test"));
admin.namespaces().createNamespace(ns2, Set.of("test"));
admin.namespaces().createNamespace(ns3, Set.of("test"));
admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup);
admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup);
admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup);
Set<String> namespaces = new HashSet<>(
admin.namespaces().getAntiAffinityNamespaces(defaultTenant, "test", antiAffinityGroup));
assertEquals(namespaces.size(), 3);
assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2));
assertTrue(namespaces.contains(ns3));
List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces(defaultTenant, "test", "invalid-group");
assertEquals(namespaces2.size(), 0);
}
@Test
public void testPersistentTopicList() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = "non-persistent://" + namespace + "/bundle-topic";
admin.namespaces().createNamespace(namespace, 20);
admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of("test"));
int totalTopics = 100;
Set<String> topicNames = new HashSet<>();
for (int i = 0; i < totalTopics; i++) {
topicNames.add(topicName + i);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create();
producer.close();
}
Set<String> topics = new HashSet<>();
String bundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundle(TopicName.get(topicName + "0")).getBundleRange();
for (int i = 0; i < totalTopics; i++) {
Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get();
if (bundle.equals(pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundle(TopicName.get(topicName + i)).getBundleRange())) {
topics.add(topic.getName());
}
}
Set<String> topicsInNs = Sets
.newHashSet(
admin.topics().getList(namespace, null, Collections.singletonMap(QueryParam.Bundle, bundle)));
assertEquals(topicsInNs.size(), topics.size());
topicsInNs.removeAll(topics);
assertEquals(topicsInNs.size(), 0);
}
@Test
public void testCreateAndGetTopicProperties() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
admin.namespaces().createNamespace(namespace, 20);
Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
nonPartitionedTopicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
Assert.assertNotNull(properties11);
Assert.assertEquals(properties11.get("key1"), "value1");
final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
Map<String, String> partitionedTopicProperties = new HashMap<>();
partitionedTopicProperties.put("key2", "value2");
admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
Assert.assertNotNull(properties22);
Assert.assertEquals(properties22.get("key2"), "value2");
}
@Test
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);
// create partitioned topic without properties
admin.topics().createPartitionedTopic(topicName, 2);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNull(properties);
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");
// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");
// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
// create topic without properties
admin.topics().createPartitionedTopic(topicNameTwo, 2);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
// remove key of properties on this topic
admin.topics().removeProperties(topicNameTwo, "key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
Map<String, String> topicProp = new HashMap<>();
topicProp.put("key1", "value1");
topicProp.put("key2", "value2");
admin.topics().updateProperties(topicNameTwo, topicProp);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
admin.topics().removeProperties(topicNameTwo, "key1");
topicProp.remove("key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
}
@Test
public void testUpdateNonPartitionedTopicProperties() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);
// create non-partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");
// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");
// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}
@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns2");
final String topicName = "non-persistent://" + namespace + "/topic";
admin.namespaces().createNamespace(namespace, 20);
admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of("test"));
int totalTopics = 100;
Set<String> topicNames = new HashSet<>();
for (int i = 0; i < totalTopics; i++) {
topicNames.add(topicName + i);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create();
producer.close();
}
for (int i = 0; i < totalTopics; i++) {
Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get();
assertNotNull(topic);
}
Set<String> topicsInNs = new HashSet<>(admin.topics().getList(namespace));
assertEquals(topicsInNs.size(), totalTopics);
topicsInNs.removeAll(topicNames);
assertEquals(topicsInNs.size(), 0);
}
@Test
public void testPublishConsumerStats() throws Exception {
final String topicName = "statTopic";
final String subscriberName = topicName + "-my-sub-1";
final String topic = "persistent://" + defaultNamespace + "/" + topicName;
final String producerName = "myProducer";
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName)
.subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.producerName(producerName)
.create();
retryStrategically((test) -> {
TopicStats stats;
try {
stats = admin.topics().getStats(topic);
return stats.getPublishers().size() > 0 && stats.getSubscriptions().get(subscriberName) != null
&& stats.getSubscriptions().get(subscriberName).getConsumers().size() > 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 200);
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.getPublishers().size(), 1);
assertNotNull(topicStats.getPublishers().get(0).getAddress());
assertNotNull(topicStats.getPublishers().get(0).getClientVersion());
assertNotNull(topicStats.getPublishers().get(0).getConnectedSince());
assertNotNull(topicStats.getPublishers().get(0).getProducerName());
assertEquals(topicStats.getPublishers().get(0).getProducerName(), producerName);
SubscriptionStats subscriber = topicStats.getSubscriptions().get(subscriberName);
assertNotNull(subscriber);
assertEquals(subscriber.getConsumers().size(), 1);
ConsumerStats consumerStats = subscriber.getConsumers().get(0);
assertNotNull(consumerStats.getAddress());
assertNotNull(consumerStats.getClientVersion());
assertNotNull(consumerStats.getConnectedSince());
producer.close();
consumer.close();
}
@Test
public void testTenantNameWithUnderscore() throws Exception {
restartClusterAfterTest();
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop_xyz", tenantInfo);
admin.namespaces().createNamespace("prop_xyz/my-namespace", Set.of("test"));
String topic = "persistent://prop_xyz/use/my-namespace/my-topic";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.create();
TopicStats stats = admin.topics().getStats(topic);
assertEquals(stats.getPublishers().size(), 1);
}
@Test
public void testTenantNameWithInvalidCharacters() {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
// If we try to create property with invalid characters, it should fail immediately
try {
admin.tenants().createTenant("prop xyz", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
try {
admin.tenants().createTenant("prop&xyz", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
}
@Test
public void testTenantWithNonexistentClusters() throws Exception {
// Check non-existing cluster
assertFalse(admin.clusters().getClusters().contains("cluster-non-existing"));
Set<String> allowedClusters = Set.of("cluster-non-existing");
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), allowedClusters);
// If we try to create tenant with nonexistent clusters, it should fail immediately
try {
admin.tenants().createTenant("test-tenant", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
assertFalse(admin.tenants().getTenants().contains("test-tenant"));
// Check existing tenant
assertTrue(admin.tenants().getTenants().contains(defaultTenant));
// If we try to update existing tenant with nonexistent clusters, it should fail immediately
try {
admin.tenants().updateTenant(defaultTenant, tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
}
@Test
public void brokerNamespaceIsolationPolicies() throws Exception {
// create
String policyName1 = "policy-1";
String cluster = pulsar.getConfiguration().getClusterName();
String namespaceRegex = "other/" + cluster + "/other.*";
String brokerName = pulsar.getAdvertisedAddress();
String brokerAddress = pulsar.getBrokerId();
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(namespaceRegex))
.primary(Collections.singletonList(brokerName))
.secondary(Collections.singletonList(brokerName + ".*"))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();
admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1);
List<BrokerNamespaceIsolationData> brokerIsolationDataList = admin.clusters()
.getBrokersWithNamespaceIsolationPolicy(cluster);
assertEquals(brokerIsolationDataList.size(), 1);
assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);
BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
assertEquals(brokerIsolationData.getBrokerName(), brokerAddress);
assertEquals(brokerIsolationData.getNamespaceRegex().size(), 1);
assertEquals(brokerIsolationData.getNamespaceRegex().get(0), namespaceRegex);
BrokerNamespaceIsolationDataImpl isolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker");
assertFalse(isolationData.isPrimary());
admin.clusters().deleteNamespaceIsolationPolicy(cluster, policyName1);
}
// create 1 namespace:
// 0. without isolation policy configured, lookup will success.
// 1. with matched isolation broker configured and matched, lookup will success.
// 2. update isolation policy, without broker matched, lookup will fail.
@Test
public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
String brokerName = pulsar.getAdvertisedAddress();
String ns1Name = defaultTenant + "/test_ns1_iso_" + System.currentTimeMillis();
admin.namespaces().createNamespace(ns1Name, Set.of("test"));
// 0. without isolation policy configured, lookup will success.
String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
assertTrue(brokerUrl.contains(brokerName));
log.info("0 get lookup url {}", brokerUrl);
// create
String policyName1 = "policy-1";
String cluster = pulsar.getConfiguration().getClusterName();
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
final List<String> primaryList = new ArrayList<>();
primaryList.add(brokerName + ".*");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(ns1Name))
.primary(primaryList)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();
admin.clusters().createNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();
// 1. with matched isolation broker configured and matched, lookup will success.
brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
assertTrue(brokerUrl.contains(brokerName));
log.info(" 1 get lookup url {}", brokerUrl);
// 2. update isolation policy, without broker matched, lookup will fail.
nsPolicyData1.getPrimary().clear();
nsPolicyData1.getPrimary().add(brokerName + "not_match");
admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();
try {
admin.lookups().lookupTopic(ns1Name + "/topic3");
} catch (Exception e) {
// expected lookup fail, because no brokers matched the policy.
log.info(" 2 expected fail lookup");
}
try {
admin.lookups().lookupTopic(ns1Name + "/topic1");
} catch (Exception e) {
// expected lookup fail, because no brokers matched the policy.
log.info(" 22 expected fail lookup");
}
admin.clusters().deleteNamespaceIsolationPolicy(cluster, policyName1);
}
@Test
public void clustersList() throws PulsarAdminException {
final String cluster = pulsar.getConfiguration().getClusterName();
admin.clusters().createCluster("global", ClusterData.builder()
.serviceUrl("http://localhost:6650").build());
// Global cluster, if there, should be omitted from the results
assertEquals(admin.clusters().getClusters(), List.of(cluster));
}
/**
* verifies cluster has been set before create topic
*
* @throws PulsarAdminException
*/
@Test
public void testClusterIsReadyBeforeCreateTopic() throws Exception {
restartClusterAfterTest();
final String topicName = "partitionedTopic";
final int partitions = 4;
final String persistentPartitionedTopicName = "persistent://" + defaultTenant + "/ns2/" + topicName;
final String NonPersistentPartitionedTopicName = "non-persistent://" + defaultTenant + "/ns2/" + topicName;
admin.namespaces().createNamespace(defaultTenant + "/ns2");
// By default the cluster will configure as configuration file. So the create topic operation
// will never throw exception except there is no cluster.
admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", Sets.newHashSet(configClusterName));
admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
}
@Test
public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
String localCluster = pulsar.getConfiguration().getClusterName();
String namespace = newUniqueName(defaultTenant + "/test-ns-with-no-clusters");
admin.namespaces().createNamespace(namespace);
// Global cluster, if there, should be omitted from the results
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
Collections.singletonList(localCluster));
}
@Test(timeOut = 30000)
public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException {
long timestamp = System.currentTimeMillis();
final String topicName = "consumer-stats-" + timestamp;
final String subscribeName = topicName + "-test-stats-sub";
final String topic = "persistent://" + defaultNamespace + "/" + topicName;
final String producerName = "producer-" + topicName;
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Producer<byte[]> producer = client.newProducer().topic(topic)
.enableBatching(false)
.producerName(producerName)
.create();
// a. Send a message to the topic.
producer.send("message-1".getBytes(StandardCharsets.UTF_8));
// b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed
// by the broker, the lastConsumedTimestamp will as the consume subscribe time.
Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.subscriptionName(subscribeName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
Message<byte[]> message = consumer.receive();
// Get the consumer stats.
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
long startConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
long startAckedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0);
long startConsumedTimestampInConsumerStats = consumerStats.getLastConsumedTimestamp();
long startAckedTimestampInConsumerStats = consumerStats.getLastAckedTimestamp();
// Because the message was pushed by the broker, the consumedTimestamp should not as 0.
assertNotEquals(0, startConsumedTimestampInConsumerStats);
// There is no consumer ack the message, so the lastAckedTimestamp still as 0.
assertEquals(0, startAckedTimestampInConsumerStats);
assertNotEquals(0, startConsumedFlowTimestamp);
assertEquals(0, startAckedTimestampInSubStats);
// c. The Consumer receives the message and acks the message.
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
while (true) {
topicStats = admin.topics().getStats(topic);
if (topicStats.getSubscriptions().get(subscribeName).getLastAckedTimestamp() != 0) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}
// Get the consumer stats.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
long consumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
long ackedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
consumerStats = subscriptionStats.getConsumers().get(0);
long consumedTimestamp = consumerStats.getLastConsumedTimestamp();
long ackedTimestamp = consumerStats.getLastAckedTimestamp();
// The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the
// consumer does not pull any messages.
assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
assertNotEquals(0, consumedFlowTimestamp);
assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);
// d. Send another messages. The lastConsumedTimestamp should be updated.
producer.send("message-2".getBytes(StandardCharsets.UTF_8));
// e. Receive the message and ack it.
message = consumer.receive();
consumer.acknowledge(message);
// Waiting for the ack command send to the broker.
while (true) {
topicStats = admin.topics().getStats(topic);
if (topicStats.getSubscriptions().get(subscribeName).getLastAckedTimestamp() != ackedTimestampInSubStats) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}
// Get the consumer stats again.
topicStats = admin.topics().getStats(topic);
subscriptionStats = topicStats.getSubscriptions().get(subscribeName);
long lastConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
long lastConsumedTimestampInSubStats = subscriptionStats.getLastConsumedTimestamp();
long lastAckedTimestampInSubStats = subscriptionStats.getLastAckedTimestamp();
consumerStats = subscriptionStats.getConsumers().get(0);
long lastConsumedTimestamp = consumerStats.getLastConsumedTimestamp();
long lastAckedTimestamp = consumerStats.getLastAckedTimestamp();
assertTrue(consumedTimestamp < lastConsumedTimestamp);
assertTrue(ackedTimestamp < lastAckedTimestamp);
assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp);
assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);
consumer.close();
producer.close();
}
@Test(timeOut = 30000)
public void testPreciseBacklog() throws Exception {
restartClusterIfReused();
final String topic = "persistent://" + defaultNamespace + "/precise-back-log";
final String subName = "sub-name";
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.create();
producer.send("message-1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer.receive();
assertNotNull(message);
// Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
// Since message have not acked, so the backlog is 10
PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName);
assertNotNull(subscription);
((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
consumer.acknowledge(message);
// wait for ack send
Awaitility.await().untilAsserted(() -> {
// Consumer acks the message, so the precise backlog is 0
TopicStats topicStats2 = admin.topics().getStats(topic, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 0);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 0);
});
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9);
}
@Test
public void testDeleteTenant() throws Exception {
restartClusterAfterTest();
// Disabled conf: systemTopicEnabled. see: https://github.com/apache/pulsar/pull/17070
boolean originalSystemTopicEnabled = conf.isSystemTopicEnabled();
if (originalSystemTopicEnabled) {
cleanup();
conf.setSystemTopicEnabled(false);
setup();
}
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
String tenant = newUniqueName("test-tenant-1");
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace
String namespace = tenant + "/test-ns-1";
admin.namespaces().createNamespace(namespace, Set.of("test"));
assertEquals(admin.namespaces().getNamespaces(tenant), List.of(namespace));
// create topic
String topic = namespace + "/test-topic-1";
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}
// delete topic
admin.topics().deletePartitionedTopic(topic);
assertTrue(admin.topics().getList(namespace).isEmpty());
// delete namespace
deleteNamespaceWithRetry(namespace, false);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
// delete tenant
admin.tenants().deleteTenant(tenant);
assertFalse(admin.tenants().getTenants().contains(tenant));
final String managedLedgersPath = "/managed-ledgers/" + tenant;
final String partitionedTopicPath = "/admin/partitioned-topics/" + tenant;
final String localPoliciesPath = "/admin/local-policies/" + tenant;
final String bundleDataPath = "/loadbalance/bundle-data/" + tenant;
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}
@Data
@AllArgsConstructor
private static class NamespaceAttr {
private boolean systemTopicEnabled;
private TopicType autoTopicCreationType;
private int defaultNumPartitions;
private boolean forceDeleteNamespaceAllowed;
}
@DataProvider(name = "namespaceAttrs")
public Object[][] namespaceAttributes(){
return new Object[][]{
{new NamespaceAttr(false, TopicType.NON_PARTITIONED, 0, false)},
{new NamespaceAttr(true, TopicType.NON_PARTITIONED, 0, false)},
{new NamespaceAttr(true, TopicType.PARTITIONED, 3, false)}
};
}
private NamespaceAttr markOriginalNamespaceAttr(){
return new NamespaceAttr(conf.isSystemTopicEnabled(), conf.getAllowAutoTopicCreationType(),
conf.getDefaultNumPartitions(), conf.isForceDeleteNamespaceAllowed());
}
private void setNamespaceAttr(NamespaceAttr namespaceAttr){
conf.setSystemTopicEnabled(namespaceAttr.systemTopicEnabled);
conf.setAllowAutoTopicCreationType(namespaceAttr.autoTopicCreationType);
conf.setDefaultNumPartitions(namespaceAttr.defaultNumPartitions);
conf.setForceDeleteNamespaceAllowed(namespaceAttr.forceDeleteNamespaceAllowed);
}
@Test(dataProvider = "namespaceAttrs")
public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
restartClusterAfterTest();
// Set conf.
cleanup();
setNamespaceAttr(namespaceAttr);
this.conf.setMetadataStoreUrl("127.0.0.1:2181");
this.conf.setConfigurationMetadataStoreUrl("127.0.0.1:2182");
setup();
String tenant = newUniqueName("test-tenant");
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace
String namespace = tenant + "/test-ns";
admin.namespaces().createNamespace(namespace, Set.of("test"));
assertEquals(admin.namespaces().getNamespaces(tenant), List.of(namespace));
// create topic
String topic = namespace + "/test-topic";
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
final String managedLedgersPath = "/managed-ledgers/" + namespace;
final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
// Trigger bundle owned by brokers.
pulsarClient.newProducer().topic(topic).create().close();
// Trigger bundle data write to ZK.
Awaitility.await().untilAsserted(() -> {
boolean bundleDataWereWriten = false;
for (PulsarService ps : new PulsarService[]{pulsar, mockPulsarSetup.getPulsar()}) {
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper) ps.getLoadManager().get();
ModularLoadManagerImpl loadManagerImpl = (ModularLoadManagerImpl) loadManager.getLoadManager();
ps.getBrokerService().updateRates();
loadManagerImpl.updateLocalBrokerData();
loadManagerImpl.writeBundleDataOnZooKeeper();
bundleDataWereWriten = bundleDataWereWriten || ps.getLocalMetadataStore().exists(bundleDataPath).join();
}
assertTrue(bundleDataWereWriten);
});
// assert znode exists in metadata store
assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}
// delete topic
admin.topics().deletePartitionedTopic(topic);
assertTrue(admin.topics().getList(namespace).isEmpty());
// delete namespace
deleteNamespaceWithRetry(namespace, false);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
// assert znode deleted in metadata store
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}
@Test
public void testDeleteNamespaceWithTopicPolicies() throws Exception {
String tenant = newUniqueName("test-tenant");
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace2
String namespace = tenant + "/test-ns2";
admin.namespaces().createNamespace(namespace, Set.of("test"));
admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
// verify namespace can be deleted even without topic policy events
admin.namespaces().deleteNamespace(namespace, true);
Awaitility.await().untilAsserted(() -> {
final CompletableFuture<Optional<Topic>> eventTopicFuture =
pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events");
assertNull(eventTopicFuture);
});
admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producer.send("test".getBytes(StandardCharsets.UTF_8));
BacklogQuota backlogQuota = BacklogQuotaImpl
.builder()
.limitTime(1000)
.limitSize(1000)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.topicPolicies().setBacklogQuota(topic, backlogQuota);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(admin.topicPolicies()
.getBacklogQuotaMap(topic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota);
});
producer.close();
admin.topics().delete(topic);
deleteNamespaceWithRetry(namespace, false);
Awaitility.await().untilAsserted(() -> {
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
});
}
@Test(timeOut = 30000)
public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {
final String topic = "persistent://" + defaultNamespace + "/precise-back-log-no-delayed-" + UUID.randomUUID().toString();
final String subName = "sub-name";
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.create();
for (int i = 0; i < 10; i++) {
if (i > 4) {
producer.newMessage()
.value("message-1".getBytes(StandardCharsets.UTF_8))
.deliverAfter(10, TimeUnit.SECONDS)
.send();
} else {
producer.send("message-1".getBytes(StandardCharsets.UTF_8));
}
}
// Wait for messages to be tracked for delayed delivery. This happens
// on the consumer dispatch side, so when the send() is complete we're
// not yet guaranteed to see the stats updated.
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
});
for (int i = 0; i < 5; i++) {
consumer.acknowledge(consumer.receive());
}
// Wait the ack send.
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
});
}
@Test
public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException, PulsarAdminException {
final String topic = "persistent://" + defaultNamespace + "/precise-back-log-for-partitioned-topic";
admin.topics().createPartitionedTopic(topic, 2);
final String subName = "sub-name";
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.create();
producer.send("message-1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer.receive();
assertNotNull(message);
// Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
// Since message have not acked, so the backlog is 10
for (int i = 0; i < 2; i++) {
PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic + "-partition-" + i).get().getSubscription(subName);
assertNotNull(subscription);
((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
}
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 20);
topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
}
@Test(timeOut = 30000)
public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException {
final String topic = "persistent://" + defaultNamespace + "/precise-back-log-no-delayed-partitioned-topic";
admin.topics().createPartitionedTopic(topic, 2);
final String subName = "sub-name";
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
long start1 = 0;
long start2 = 0;
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.create();
for (int i = 0; i < 10; i++) {
if (i == 0) {
start1 = Clock.systemUTC().millis();
}
if (i == 5) {
start2 = Clock.systemUTC().millis();
}
if (i > 4) {
producer.newMessage()
.value("message-1".getBytes(StandardCharsets.UTF_8))
.deliverAfter(10, TimeUnit.SECONDS)
.send();
} else {
producer.send("message-1".getBytes(StandardCharsets.UTF_8));
}
}
// wait until the message add to delay queue.
long finalStart1 = start1;
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart1);
});
for (int i = 0; i < 5; i++) {
consumer.acknowledge(consumer.receive());
}
// Wait the ack send.
long finalStart2 = start2;
Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> {
TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart2);
});
}
@Test
public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
restartClusterAfterTest();
final String topic = "persistent://" + defaultNamespace + "/max-num-partitions-per-partitioned-topic-success";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);
try {
admin.topics().createPartitionedTopic(topic, 2);
} catch (Exception e) {
fail("should not throw any exceptions");
}
// reset configuration
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
}
@Test
public void testMaxNumPartitionsPerPartitionedTopicFailure() {
restartClusterAfterTest();
final String topic = "persistent://" + defaultNamespace + "/max-num-partitions-per-partitioned-topic-failure";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);
try {
admin.topics().createPartitionedTopic(topic, 3);
fail("should throw exception when number of partitions exceed than max partitions");
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException);
}
// reset configuration
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
}
@Test
public void testListOfNamespaceBundles() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
String tenantName = newUniqueName("prop-xyz2");
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(tenantName + "/ns1", 10);
admin.namespaces().setNamespaceReplicationClusters(tenantName + "/ns1", Set.of("test"));
admin.namespaces().createNamespace(tenantName + "/test/ns2", 10);
assertEquals(admin.namespaces().getBundles(tenantName + "/ns1").getNumBundles(), 10);
assertEquals(admin.namespaces().getBundles(tenantName + "/test/ns2").getNumBundles(), 10);
admin.namespaces().deleteNamespace(tenantName + "/test/ns2");
}
@Test
public void testForceDeleteNamespace() throws Exception {
restartClusterAfterTest();
String tenantName = newUniqueName("prop-xyz2");
final String namespaceName = tenantName + "/ns1";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(namespaceName, 1);
final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
pulsarClient.newProducer(Schema.DOUBLE).topic(topic).create().close();
Awaitility.await().untilAsserted(() -> assertNotNull(admin.schemas().getSchemaInfo(topic)));
deleteNamespaceWithRetry(namespaceName, true);
try {
admin.schemas().getSchemaInfo(topic);
Assert.fail("fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 404);
}
}
@Test
public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
String tenantName = newUniqueName("prop-xyz2");
final String namespaceName = tenantName + "/ns1";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(namespaceName, 1);
admin.namespaces().setAutoTopicCreation(namespaceName,
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(20)
.build());
final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
// start a consumer, that creates the topic
try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic)
.subscriptionName("test").autoUpdatePartitions(true).subscribe()) {
// wait for the consumer to settle
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertNotNull(admin.topics().getSubscriptions(topic).contains("test")));
// verify that the partitioned topic is created
assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions);
// the consumer will race with the deletion
// the consumer will try to re-create the partitions
admin.namespaces().deleteNamespace(namespaceName, true);
assertFalse(admin.namespaces().getNamespaces(tenantName).contains("ns1"));
}
}
@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
String clusterName = "test2";
admin.clusters().createCluster(clusterName, cluster);
Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);
// update
cluster = ClusterData.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.proxyServiceUrl("pulsar://example.com")
.proxyProtocol(ProxyProtocol.SNI)
.build();
admin.clusters().updateCluster(clusterName, cluster);
Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster);
}
@Test
public void testMaxNamespacesPerTenant() throws Exception {
restartClusterAfterTest();
cleanup();
conf.setMaxNamespacesPerTenant(2);
setup();
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
admin.namespaces().createNamespace("testTenant/ns2", Set.of("test"));
try {
admin.namespaces().createNamespace("testTenant/ns3", Set.of("test"));
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
Assert.assertEquals(e.getHttpError(), "Exceed the maximum number of namespace in tenant :testTenant");
}
}
@Test
public void testAutoTopicCreationOverrideWithMaxNumPartitionsLimit() throws Exception{
restartClusterAfterTest();
cleanup();
conf.setMaxNumPartitionsPerPartitionedTopic(10);
setup();
TenantInfoImpl tenantInfo = new TenantInfoImpl(
Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
// test non-partitioned
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
AutoTopicCreationOverride overridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("non-partitioned")
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", overridePolicy);
AutoTopicCreationOverride newOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(overridePolicy, newOverridePolicy);
// test partitioned
AutoTopicCreationOverride partitionedOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(10)
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedOverridePolicy);
AutoTopicCreationOverride partitionedNewOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(partitionedOverridePolicy, partitionedNewOverridePolicy);
// test partitioned with error
AutoTopicCreationOverride partitionedWrongOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(123)
.build();
try {
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedWrongOverridePolicy);
fail();
} catch (Exception ex) {
assertTrue(ex.getCause() instanceof NotAcceptableException);
}
}
@Test
public void testMaxTopicsPerNamespace() throws Exception {
restartClusterAfterTest();
cleanup();
conf.setMaxTopicsPerNamespace(10);
setup();
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
// check create partitioned/non-partitioned topics
String topic = "persistent://testTenant/ns1/test_create_topic_v";
admin.topics().createPartitionedTopic(topic + "1", 2);
admin.topics().createPartitionedTopic(topic + "2", 3);
admin.topics().createPartitionedTopic(topic + "3", 4);
admin.topics().createNonPartitionedTopic(topic + "4");
try {
admin.topics().createPartitionedTopic(topic + "5", 2);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
Assert.assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace.");
}
//unlimited
cleanup();
conf.setMaxTopicsPerNamespace(0);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
for (int i = 0; i < 10; ++i) {
admin.topics().createPartitionedTopic(topic + i, 2);
admin.topics().createNonPartitionedTopic(topic + i + i);
}
// check first create normal topic, then system topics, unlimited even setMaxTopicsPerNamespace
cleanup();
conf.setMaxTopicsPerNamespace(5);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
cleanup();
conf.setMaxTopicsPerNamespace(5);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
// check producer/consumer auto create partitioned topic
cleanup();
conf.setMaxTopicsPerNamespace(10);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
pulsarClient.newProducer().topic(topic + "1").create().close();
pulsarClient.newProducer().topic(topic + "2").create().close();
pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
try {
pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
}
// check producer/consumer auto create non-partitioned topic
cleanup();
conf.setMaxTopicsPerNamespace(3);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
pulsarClient.newProducer().topic(topic + "1").create().close();
pulsarClient.newProducer().topic(topic + "2").create().close();
pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close();
try {
pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
}
}
@Test
public void testInvalidBundleErrorResponse() throws Exception {
try {
admin.namespaces().deleteNamespaceBundle(defaultNamespace, "invalid-bundle");
fail("should have failed due to invalid bundle");
} catch (PreconditionFailedException e) {
assertTrue(e.getMessage().startsWith("Invalid bundle range"));
}
}
@Test
public void testMaxSubscriptionsPerTopic() throws Exception {
restartClusterAfterTest();
cleanup();
conf.setMaxSubscriptionsPerTopic(2);
setup();
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic";
admin.topics().createPartitionedTopic(topic, 3);
Producer producer = pulsarClient.newProducer().topic(topic).create();
producer.close();
// create subscription
admin.topics().createSubscription(topic, "test-sub1", MessageId.earliest);
admin.topics().createSubscription(topic, "test-sub2", MessageId.earliest);
try {
admin.topics().createSubscription(topic, "test-sub3", MessageId.earliest);
Assert.fail();
} catch (PulsarAdminException e) {
log.info("create subscription failed. Exception: ", e);
}
cleanup();
conf.setMaxSubscriptionsPerTopic(0);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
admin.topics().createPartitionedTopic(topic, 3);
producer = pulsarClient.newProducer().topic(topic).create();
producer.close();
for (int i = 0; i < 10; ++i) {
admin.topics().createSubscription(topic, "test-sub" + i, MessageId.earliest);
}
cleanup();
conf.setMaxSubscriptionsPerTopic(2);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
admin.topics().createPartitionedTopic(topic, 3);
producer = pulsarClient.newProducer().topic(topic).create();
producer.close();
Consumer consumer1 = null;
Consumer consumer2 = null;
Consumer consumer3 = null;
try {
consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe();
Assert.assertNotNull(consumer1);
} catch (PulsarClientException e) {
Assert.fail();
}
try {
consumer2 = pulsarClient.newConsumer().subscriptionName("test-sub2").topic(topic).subscribe();
Assert.assertNotNull(consumer2);
} catch (PulsarClientException e) {
Assert.fail();
}
try {
consumer3 = pulsarClient.newConsumer().subscriptionName("test-sub3").topic(topic).subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("subscription reached max subscriptions per topic");
}
consumer1.close();
consumer2.close();
admin.topics().deletePartitionedTopic(topic);
}
@Test(timeOut = 30000)
public void testMaxSubPerTopicApi() throws Exception {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100);
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100);
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace));
admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get();
assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200);
admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace);
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get()));
try {
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100);
fail("should fail");
} catch (PulsarAdminException ignore) {
}
}
@Test(timeOut = 60000)
public void testSetNamespaceEntryFilters() throws Exception {
restartClusterAfterTest();
restartClusterIfReused();
@Cleanup
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);
try {
EntryFilters entryFilters = new EntryFilters("test");
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topicName = myNamespace + "/topic";
admin.topics().createNonPartitionedTopic(topicName);
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
assertEquals(pulsar
.getBrokerService()
.getTopic(topicName, false)
.get()
.get()
.getEntryFilters()
.size(), 0);
admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters);
assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), entryFilters);
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(topicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "test");
});
assertEquals(pulsar
.getBrokerService()
.getTopic(topicName, false)
.get()
.get()
.getEntryFilters()
.size(), 1);
admin.namespaces().removeNamespaceEntryFilters(myNamespace);
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(topicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "");
});
assertEquals(pulsar
.getBrokerService()
.getTopic(topicName, false)
.get()
.get()
.getEntryFilters()
.size(), 0);
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}
@Test(dataProvider = "topicType")
public void testSetTopicLevelEntryFilters(String topicType) throws Exception {
restartClusterAfterTest();
restartClusterIfReused();
@Cleanup
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);
try {
EntryFilters entryFilters = new EntryFilters("test");
final String topic = topicType + "://" + defaultNamespace + "/test-schema-validation-enforced";
admin.topics().createPartitionedTopic(topic, 1);
final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(fullTopicName)
.create();
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters()
.size(), 0);
admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false), entryFilters));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "test");
});
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters()
.size(), 1);
admin.topicPolicies().removeEntryFiltersPerTopic(topic);
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "");
});
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters()
.size(), 0);
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}
@Test(timeOut = 60000)
public void testSetEntryFiltersHierarchy() throws Exception {
restartClusterAfterTest();
restartClusterIfReused();
@Cleanup
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
), new EntryFilterDefinition(
"test1",
null,
EntryFilter2Test.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);
conf.setEntryFilterNames(List.of("test", "test1"));
conf.setAllowOverrideEntryFilters(true);
try {
final String topic = "persistent://" + defaultNamespace + "/test-schema-validation-enforced";
admin.topics().createPartitionedTopic(topic, 1);
final String fullTopicName = topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(fullTopicName)
.create();
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test,test1")));
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters()
.size(), 2);
EntryFilters nsEntryFilters = new EntryFilters("test");
admin.namespaces().setNamespaceEntryFilters(defaultNamespace, nsEntryFilters);
assertEquals(admin.namespaces().getNamespaceEntryFilters(defaultNamespace), nsEntryFilters);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test")));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "test");
});
Awaitility.await().untilAsserted(() -> {
final List<EntryFilter> entryFilters = pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters();
assertEquals(entryFilters.size(), 1);
assertEquals(((EntryFilterWithClassLoader)entryFilters.get(0))
.getEntryFilter().getClass(), EntryFilterTest.class);
});
EntryFilters topicEntryFilters = new EntryFilters("test1");
admin.topicPolicies().setEntryFiltersPerTopic(topic, topicEntryFilters);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false), topicEntryFilters));
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test1")));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFiltersPolicy()
.getEntryFilterNames(), "test1");
});
final List<EntryFilter> entryFilters = pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
.get()
.get()
.getEntryFilters();
assertEquals(entryFilters.size(), 1);
assertEquals(((EntryFilterWithClassLoader) entryFilters.get(0))
.getEntryFilter().getClass(), EntryFilter2Test.class);
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}
@Test(timeOut = 60000)
public void testValidateNamespaceEntryFilters() throws Exception {
restartClusterAfterTest();
restartClusterIfReused();
@Cleanup
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);
try {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(""));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(","));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("test,notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}
@Test(timeOut = 60000)
public void testValidateTopicEntryFilters() throws Exception {
restartClusterAfterTest();
restartClusterIfReused();
@Cleanup
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);
try {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topicName = myNamespace + "/topic";
admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicName)
.create();
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(""));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(","));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("test,notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topicName, false));
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}
@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
restartClusterAfterTest();
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
pulsarClient.newProducer().topic(topic).create().close();
final int maxSub = 2;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub);
List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
consumerList.add(consumer);
}
//Create a client that can fail quickly
try (PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build()){
@Cleanup
Consumer<byte[]> subscribe =
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
for (Consumer<?> c : consumerList) {
c.close();
}
}
@Test(timeOut = 30000)
public void testMaxSubPerTopicPriority() throws Exception {
restartClusterAfterTest();
final int brokerLevelMaxSub = 2;
cleanup();
conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub);
setup();
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
//Create a client that can fail quickly
@Cleanup
PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build();
//We can only create 2 consumers
List<Consumer<?>> consumerList = new ArrayList<>(brokerLevelMaxSub);
for (int i = 0; i < brokerLevelMaxSub; i++) {
Consumer<?> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
consumerList.add(consumer);
}
try {
@Cleanup
Consumer<byte[]> subscribe =
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {
}
//Set namespace-level policy,the limit should up to 4
final int nsLevelMaxSub = 4;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub);
try {
@Cleanup
Consumer<byte[]> subscribe =
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
} catch (Exception ignore) {
}
for (Consumer<?> c : consumerList) {
c.close();
}
}
@Test
public void testMaxProducersPerTopicUnlimited() throws Exception {
restartClusterAfterTest();
final int maxProducersPerTopic = 1;
cleanup();
conf.setMaxProducersPerTopic(maxProducersPerTopic);
setup();
//init namespace
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
List<Producer<byte[]>> producers = new ArrayList<>();
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producers.add(producer);
}
admin.namespaces().removeMaxProducersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
try {
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max producers limit"));
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
// should success
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producers.add(producer);
try {
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max producers limit"));
}
//clean up
for (Producer<byte[]> tempProducer : producers) {
tempProducer.close();
}
}
@Test
public void testMaxConsumersPerTopicUnlimited() throws Exception {
restartClusterAfterTest();
final int maxConsumersPerTopic = 1;
cleanup();
conf.setMaxConsumersPerTopic(maxConsumersPerTopic);
setup();
//init namespace
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";
assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
}
admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
// should success
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
}
//clean up
for (Consumer<byte[]> subConsumer : consumers) {
subConsumer.close();
}
}
@Test
public void testClearBacklogForTheSubscriptionThatNoConsumers() throws Exception {
final String topic = "persistent://" + defaultNamespace + "/clear_backlog_no_consumers" + UUID.randomUUID().toString();
final String sub = "my-sub";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, sub, MessageId.earliest);
admin.topics().skipAllMessages(topic, sub);
}
@Test(timeOut = 200000)
public void testCompactionApi() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(namespace, Set.of("test"));
assertNull(admin.namespaces().getCompactionThreshold(namespace));
assertEquals(pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes(), 0);
admin.namespaces().setCompactionThreshold(namespace, 10);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getCompactionThreshold(namespace)));
assertEquals(admin.namespaces().getCompactionThreshold(namespace).intValue(), 10);
admin.namespaces().removeCompactionThreshold(namespace);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(namespace)));
}
@Test(timeOut = 200000)
public void testCompactionPriority() throws Exception {
restartClusterAfterTest();
cleanup();
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10000);
setup();
final String namespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(namespace, Set.of("test"));
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
TopicName topicName = TopicName.get(topic);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
PersistentTopic mockTopic = spy(persistentTopic);
mockTopic.checkCompaction();
// Disabled by default
verify(mockTopic, times(0)).triggerCompaction();
// Set namespace-level policy
admin.namespaces().setCompactionThreshold(namespace, 1);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getCompactionThreshold(namespace)));
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
Field field = managedLedger.getClass().getDeclaredField("totalSize");
field.setAccessible(true);
field.setLong(managedLedger, 1000L);
mockTopic.checkCompaction();
verify(mockTopic, times(1)).triggerCompaction();
//Set topic-level policy
admin.topics().setCompactionThreshold(topic, 0);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic)));
mockTopic.checkCompaction();
verify(mockTopic, times(1)).triggerCompaction();
// Remove topic-level policy
admin.topics().removeCompactionThreshold(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic)));
mockTopic.checkCompaction();
verify(mockTopic, times(2)).triggerCompaction();
// Remove namespace-level policy
admin.namespaces().removeCompactionThreshold(namespace);
Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getCompactionThreshold(namespace)));
mockTopic.checkCompaction();
verify(mockTopic, times(2)).triggerCompaction();
}
@Test
public void testProperties() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(namespace, Set.of("test"));
admin.namespaces().setProperty(namespace, "a", "a");
assertEquals("a", admin.namespaces().getProperty(namespace, "a"));
assertNull(admin.namespaces().getProperty(namespace, "b"));
admin.namespaces().setProperty(namespace, "b", "b");
assertEquals("b", admin.namespaces().getProperty(namespace, "b"));
admin.namespaces().setProperty(namespace, "a", "a1");
assertEquals("a1", admin.namespaces().getProperty(namespace, "a"));
assertEquals("b", admin.namespaces().removeProperty(namespace, "b"));
assertNull(admin.namespaces().getProperty(namespace, "b"));
admin.namespaces().clearProperties(namespace);
assertEquals(admin.namespaces().getProperties(namespace).size(), 0);
Map<String, String> properties = new HashMap<>();
properties.put("aaa", "aaa");
properties.put("bbb", "bbb");
admin.namespaces().setProperties(namespace, properties);
assertEquals(admin.namespaces().getProperties(namespace), properties);
admin.namespaces().clearProperties(namespace);
assertEquals(admin.namespaces().getProperties(namespace).size(), 0);
}
@Test
public void testGetListInBundle() throws Exception {
final String namespace = defaultTenant + "/ns11";
admin.namespaces().createNamespace(namespace, 3);
final String persistentTopicName = TopicName.get(
"persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
final String nonPersistentTopicName = TopicName.get(
"non-persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
admin.topics().createPartitionedTopic(persistentTopicName, 3);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
pulsarClient.newProducer().topic(persistentTopicName).create().close();
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
List<String> boundaries = bundlesData.getBoundaries();
int topicNum = 0;
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
List<String> topic = admin.topics().getListInBundle(namespace, bundle);
if (topic == null) {
continue;
}
topicNum += topic.size();
for (String s : topic) {
assertFalse(TopicName.get(s).isPersistent());
}
}
assertEquals(topicNum, 3);
}
@Test
public void testGetTopicsWithDifferentMode() throws Exception {
final String namespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(namespace, Set.of("test"));
final String persistentTopicName = TopicName
.get("persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
.toString();
final String nonPersistentTopicName = TopicName
.get("non-persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
.toString();
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create();
List<String> topics = new ArrayList<>(admin.topics().getList(namespace));
assertEquals(topics.size(), 2);
assertTrue(topics.contains(persistentTopicName));
assertTrue(topics.contains(nonPersistentTopicName));
topics.clear();
topics.addAll(admin.topics().getList(namespace, TopicDomain.persistent));
assertEquals(topics.size(), 1);
assertTrue(topics.contains(persistentTopicName));
topics.clear();
topics.addAll(admin.topics().getList(namespace, TopicDomain.non_persistent));
assertEquals(topics.size(), 1);
assertTrue(topics.contains(nonPersistentTopicName));
try {
admin.topics().getList(namespace, TopicDomain.getEnum("none"));
fail("Should failed with invalid get topic mode.");
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid topic domain: 'none'");
}
producer1.close();
producer2.close();
}
@Test(dataProvider = "isV1")
public void testNonPartitionedTopic(boolean isV1) throws Exception {
restartClusterAfterTest();
String tenant = defaultTenant;
String cluster = "test";
String namespace = tenant + "/" + (isV1 ? cluster + "/" : "") + "n1" + isV1;
String topic = "persistent://" + namespace + "/t1" + isV1;
admin.namespaces().createNamespace(namespace, Set.of(cluster));
admin.topics().createNonPartitionedTopic(topic);
assertTrue(admin.topics().getList(namespace).contains(topic));
}
/**
* Validate retring failed partitioned topic should succeed.
* @throws Exception
*/
@Test
public void testFailedUpdatePartitionedTopic() throws Exception {
final String topicName = "failed-topic";
final String subName1 = topicName + "-my-sub-1";
final int startPartitions = 4;
final int newPartitions = 8;
final String partitionedTopicName = "persistent://" + defaultNamespace + "/" + topicName;
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
consumer1.close();
// validate partition topic is created
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);
// create a subscription for few new partition which can fail
try {
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
} catch (PulsarAdminException.ConflictException ex) {
// OK
}
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true);
// validate subscription is created for new partition.
for (int i = startPartitions; i < newPartitions; i++) {
assertNotNull(
admin.topics().getStats(partitionedTopicName + "-partition-" + i).getSubscriptions().get(subName1));
}
// validate update partition is success
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, newPartitions);
}
/**
* Validate retring failed partitioned topic should succeed.
* @throws Exception
*/
@Test
public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String subscriptionName = "s1";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
// Send one message.
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
.create();
MessageIdImpl messageId = (MessageIdImpl) producer.send("123");
// Catch up.
admin.topics().skipAllMessages(topicName, subscriptionName);
// Get topic stats with earliestTimeInBacklog
TopicStats topicStats = admin.topics().getStats(topicName, false, false, true);
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(), -1L);
// cleanup.
producer.close();
admin.topics().delete(topicName);
}
@Test(dataProvider = "topicType")
public void testPartitionedStatsAggregationByProducerName(String topicType) throws Exception {
restartClusterIfReused();
conf.setAggregatePublisherStatsByProducerName(true);
final String topic = topicType + "://" + defaultNamespace + "/test-partitioned-stats-aggregation-by-producer-name";
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 0;
}
})
.accessMode(ProducerAccessMode.Shared)
.create();
@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return msg.hasKey() ? Integer.parseInt(msg.getKey()) : 5;
}
})
.accessMode(ProducerAccessMode.Shared)
.create();
for (int i = 0; i < 10; i++) {
producer1.newMessage()
.key(String.valueOf(i % 5))
.value(("message".getBytes(StandardCharsets.UTF_8)))
.send();
producer2.newMessage()
.key(String.valueOf(i % 5 + 5))
.value(("message".getBytes(StandardCharsets.UTF_8)))
.send();
}
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(topic, true);
assertEquals(topicStats.getPartitions().size(), 10);
assertEquals(topicStats.getPartitions().values().stream().mapToInt(e -> e.getPublishers().size()).sum(), 10);
assertEquals(topicStats.getPartitions().values().stream().map(e -> e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
assertEquals(topicStats.getPublishers().size(), 2);
topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
}
@Test(dataProvider = "topicType")
public void testPartitionedStatsAggregationByProducerNamePerPartition(String topicType) throws Exception {
restartClusterIfReused();
conf.setAggregatePublisherStatsByProducerName(true);
final String topic = topicType + "://" + defaultNamespace + "/test-partitioned-stats-aggregation-by-producer-name-per-pt";
admin.topics().createPartitionedTopic(topic, 2);
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
.create();
@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 1)
.create();
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(topic, true);
assertEquals(topicStats.getPartitions().size(), 2);
assertEquals(topicStats.getPartitions().values().stream().mapToInt(e -> e.getPublishers().size()).sum(), 2);
assertEquals(topicStats.getPartitions().values().stream().map(e -> e.getPublishers().get(0).getProducerName()).distinct().count(), 2);
assertEquals(topicStats.getPublishers().size(), 2);
topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
}
@Test(dataProvider = "topicType")
public void testSchemaValidationEnforced(String topicType) throws Exception {
final String topic = topicType + "://" + defaultNamespace + "/test-schema-validation-enforced";
admin.topics().createPartitionedTopic(topic, 1);
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
.create();
boolean schemaValidationEnforced = admin.topics().getSchemaValidationEnforced(topic, false);
assertEquals(schemaValidationEnforced, false);
admin.topics().setSchemaValidationEnforced(topic, true);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true)
);
}
@Test
public void testGetNamespaceTopicList() throws Exception {
final String persistentTopic = "persistent://" + defaultNamespace + "/testGetNamespaceTopicList";
final String nonPersistentTopic = "non-persistent://" + defaultNamespace + "/non-testGetNamespaceTopicList";
final String eventTopic = "persistent://" + defaultNamespace + "/__change_events";
admin.topics().createNonPartitionedTopic(persistentTopic);
Awaitility.await().untilAsserted(() ->
admin.namespaces().getTopics(defaultNamespace,
ListNamespaceTopicsOptions.builder().mode(Mode.PERSISTENT).includeSystemTopic(true).build())
.contains(eventTopic));
List<String> notIncludeSystemTopics = admin.namespaces().getTopics(defaultNamespace,
ListNamespaceTopicsOptions.builder().includeSystemTopic(false).build());
Assert.assertFalse(notIncludeSystemTopics.contains(eventTopic));
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(nonPersistentTopic)
.create();
List<String> notPersistentTopics = admin.namespaces().getTopics(defaultNamespace,
ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
}
@Test
private void testTerminateSystemTopic() throws Exception {
final String topic = "persistent://" + defaultNamespace + "/testTerminateSystemTopic";
admin.topics().createNonPartitionedTopic(topic);
final String eventTopic = "persistent://" + defaultNamespace + "/__change_events";
admin.topicPolicies().setMaxConsumers(topic, 2);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(admin.topicPolicies().getMaxConsumers(topic), Integer.valueOf(2));
});
PulsarAdminException ex = expectThrows(PulsarAdminException.class,
() -> admin.topics().terminateTopic(eventTopic));
assertTrue(ex instanceof PulsarAdminException.NotAllowedException);
}
@Test
private void testDeleteNamespaceForciblyWithManyTopics() throws Exception {
final String ns = defaultTenant + "/ns-testDeleteNamespaceForciblyWithManyTopics";
admin.namespaces().createNamespace(ns, 2);
for (int i = 0; i < 100; i++) {
admin.topics().createPartitionedTopic(String.format("persistent://%s", ns + "/topic" + i), 3);
}
admin.namespaces().deleteNamespace(ns, true);
Assert.assertFalse(admin.namespaces().getNamespaces(defaultTenant).contains(ns));
}
@Test
private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Exception {
final String ns = defaultTenant + "/ns-testSetBacklogQuotasNamespaceLevel";
final long backlogQuotaLimitSize = 100000002;
final int backlogQuotaLimitTime = 2;
admin.namespaces().createNamespace(ns, 2);
// create retention.
admin.namespaces().setRetention(ns, new RetentionPolicies(1800, 10000));
// set backlog quota.
admin.namespaces().setBacklogQuota(ns, BacklogQuota.builder()
.limitSize(backlogQuotaLimitSize).limitTime(backlogQuotaLimitTime).build());
// Verify result.
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> map = admin.namespaces().getBacklogQuotaMap(ns);
assertEquals(map.size(), 1);
assertTrue(map.containsKey(BacklogQuota.BacklogQuotaType.destination_storage));
BacklogQuota backlogQuota = map.get(BacklogQuota.BacklogQuotaType.destination_storage);
assertEquals(backlogQuota.getLimitSize(), backlogQuotaLimitSize);
assertEquals(backlogQuota.getLimitTime(), backlogQuotaLimitTime);
// cleanup.
admin.namespaces().deleteNamespace(ns);
}
@Test
private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
// Send 10 messages.
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
.receiverQueueSize(0).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send(i + "");
}
// Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
}
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topic);
}
@Test
public void testGetStatsIfPartitionNotExists() throws Exception {
// create topic.
final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(partitionedTp, 1);
TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertTrue(topicExists1);
// Verify topics-stats works.
TopicStats topicStats = admin.topics().getStats(partition0.toString());
assertNotNull(topicStats);
// Delete partition and call topic-stats again.
admin.topics().delete(partition0.toString());
boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertFalse(topicExists2);
// Verify: respond 404.
try {
admin.topics().getStats(partition0.toString());
fail("Should respond 404 after the partition was deleted");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("Topic partitions were not yet created"));
}
// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
}