blob: 60e591446d1851370f1d6f17e54aba23260c9577 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-admin")
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
private final String testTenant = "my-tenant";
private final String testNamespace = "my-namespace";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String myNamespaceV1 = testTenant + "/test/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";
private final String persistenceTopic = "persistent://" + myNamespace + "/test-set-persistence";
private final String topicPolicyEventsTopic = "persistent://" + myNamespace + "/__change_events";
private final int testTopicPartitions = 2;
@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setDefaultNumberOfNamespaceBundles(1);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
admin.namespaces().createNamespace(myNamespaceV1);
admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
Producer producer = pulsarClient.newProducer().topic(testTopic).create();
producer.close();
waitForZooKeeperWatchers();
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
this.resetConfig();
}
@Test
public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Exception{
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(),
NamespaceName.get(myNamespace),
"test-" + UUID.randomUUID()
);
String topic = topicName.toString();
SystemTopicBasedTopicPoliciesService policyService =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
//set up topic with maxSubscriptionsPerTopic = 10
admin.topics().createNonPartitionedTopic(topic);
admin.topicPolicies().setMaxSubscriptionsPerTopicAsync(topic, 10).get();
//wait until topic loaded with right policy value.
Awaitility.await().untilAsserted(()-> {
AbstractTopic topic1 = (AbstractTopic) pulsar.getBrokerService().getTopic(topic, true).get().get();
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(), Integer.valueOf(10));
});
//unload the topic
pulsar.getNamespaceService().unloadNamespaceBundle(pulsar.getNamespaceService().getBundle(topicName)).get();
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//load the nameserver, but topic is not init.
log.info("lookup:{}",admin.lookups().lookupTopic(topic));
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
});
//load the topic.
AbstractTopic topic1 = (AbstractTopic) pulsar.getBrokerService().getTopic(topic, true).get().get();
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(), Integer.valueOf(10));
}
@Test
public void testSetSizeBasedBacklogQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
for (int i = 0; i < testTopicPartitions; i++) {
String partition = TopicName.get(testTopic).getPartition(i).toString();
Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testSetTimeBasedBacklogQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(1000)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
for (int i = 0; i < testTopicPartitions; i++) {
String partition = TopicName.get(testTopic).getPartition(i).toString();
Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveSizeBasedBacklogQuota() throws Exception {
List<Topic> partitions = new ArrayList<>();
List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
for (int i = 0; i < testTopicPartitions; i++) {
String partition = TopicName.get(testTopic).getPartition(i).toString();
Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
partitions.add(topic);
BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
defaultBacklogQuotas.add(defaultBacklogQuota);
}
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
partitions.forEach(topic -> {
BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
});
admin.topicPolicies().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage)));
for (int i = 0; i < partitions.size(); i++) {
BacklogQuota backlogQuotaInTopic =
partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
testTopic);
Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveTimeBasedBacklogQuota() throws Exception {
List<Topic> partitions = new ArrayList<>();
List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
for (int i = 0; i < testTopicPartitions; i++) {
String partition = TopicName.get(testTopic).getPartition(i).toString();
Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
partitions.add(topic);
BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
defaultBacklogQuotas.add(defaultBacklogQuota);
}
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(1000)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
for (int i = 0; i < partitions.size(); i++) {
Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age),
backlogQuota);
//destination_storage should keep the same.
Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage),
defaultBacklogQuotas.get(i));
}
admin.topicPolicies().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age)));
for (int i = 0; i < partitions.size(); i++) {
BacklogQuota backlogQuotaInTopic =
partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
testTopic);
Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testCheckSizeBasedBacklogQuota() throws Exception {
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
String namespace = TopicName.get(testTopic).getNamespace();
admin.namespaces().setRetention(namespace, retentionPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(10 * 1024 * 1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
try {
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
backlogQuota = BacklogQuota.builder()
.limitSize(10 * 1024 * 1024 + 1)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
try {
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
backlogQuota = BacklogQuota.builder()
.limitSize(10 * 1024 * 1024 - 1)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
BacklogQuota finalBacklogQuota = backlogQuota;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), finalBacklogQuota));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testCheckTimeBasedBacklogQuota() throws Exception {
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
String namespace = TopicName.get(testTopic).getNamespace();
admin.namespaces().setRetention(namespace, retentionPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(10 * 60)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
try {
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
backlogQuota = BacklogQuota.builder()
.limitTime(10 * 60 + 1)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
try {
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
backlogQuota = BacklogQuota.builder()
.limitTime(10 * 60 - 1)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
BacklogQuota finalBacklogQuota = backlogQuota;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), finalBacklogQuota));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test(timeOut = 20000)
public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic), Maps.newHashMap());
assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap());
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf);
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), brokerQuotaMap);
BacklogQuota namespaceQuota = BacklogQuota.builder()
.limitSize(30)
.limitTime(10)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota);
Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap();
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota);
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, BacklogQuota.builder()
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), namespaceQuotaMap);
BacklogQuota topicQuota = BacklogQuota.builder()
.limitSize(40)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topicPolicies().getBacklogQuotaMap(topic).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap();
topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota);
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), topicQuotaMap);
admin.namespaces().removeBacklogQuota(myNamespace);
admin.topicPolicies().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.destination_storage);
Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace)
.get(BacklogQuota.BacklogQuotaType.destination_storage) == null));
Awaitility.await().untilAsserted(() -> assertTrue(admin.topicPolicies().getBacklogQuotaMap(topic).isEmpty()));
assertTrue(admin.topicPolicies().getBacklogQuotaMap(topic, true)
.get(BacklogQuota.BacklogQuotaType.destination_storage) == null);
}
@Test(timeOut = 20000)
public void testGetTimeBasedBacklogQuotaApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic), Maps.newHashMap());
assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap());
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf);
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), brokerQuotaMap);
BacklogQuota namespaceQuota = BacklogQuota.builder()
.limitTime(30)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap();
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, namespaceQuota);
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder()
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), namespaceQuotaMap);
BacklogQuota topicQuota = BacklogQuota.builder()
.limitTime(40)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topicPolicies().getBacklogQuotaMap(topic).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap();
topicQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, topicQuota);
assertEquals(admin.topicPolicies().getBacklogQuotaMap(topic, true), topicQuotaMap);
admin.namespaces().removeBacklogQuota(myNamespace, BacklogQuota.BacklogQuotaType.message_age);
admin.topicPolicies().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace)
.get(BacklogQuota.BacklogQuotaType.message_age) == null));
Awaitility.await().untilAsserted(() -> assertTrue(admin.topicPolicies().getBacklogQuotaMap(topic).isEmpty()));
assertTrue(admin.topicPolicies().getBacklogQuotaMap(topic, true)
.get(BacklogQuota.BacklogQuotaType.message_age) == null);
}
@Test
public void testCheckBacklogQuotaFailed() throws Exception {
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
String namespace = TopicName.get(testTopic).getNamespace();
admin.namespaces().setRetention(namespace, retentionPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(10 * 1024 * 1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
try {
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
//Ensure that the cache has not been updated after a long time
Awaitility.await().atLeast(1, TimeUnit.SECONDS);
assertNull(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage));
}
@Test
public void testCheckRetentionSizeBasedQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(10 * 1024 * 1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
RetentionPolicies retention = new RetentionPolicies(10, 10);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
try {
admin.topicPolicies().setRetention(testTopic, retention);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
retention = new RetentionPolicies(10, 9);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
try {
admin.topicPolicies().setRetention(testTopic, retention);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
retention = new RetentionPolicies(10, 12);
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
RetentionPolicies finalRetention = retention;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), finalRetention));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testCheckRetentionTimeBasedQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(10 * 60)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
RetentionPolicies retention = new RetentionPolicies(10, 10);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
try {
admin.topicPolicies().setRetention(testTopic, retention);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
retention = new RetentionPolicies(9, 10);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
try {
admin.topicPolicies().setRetention(testTopic, retention);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
retention = new RetentionPolicies(12, 10);
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
RetentionPolicies finalRetention = retention;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), finalRetention));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testSetRetention() throws Exception {
RetentionPolicies retention = new RetentionPolicies(60, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveRetention() throws Exception {
RetentionPolicies retention = new RetentionPolicies(60, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention));
admin.topicPolicies().removeRetention(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getRetention(testTopic)));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test(timeOut = 10000)
public void testRetentionAppliedApi() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
RetentionPolicies brokerPolicies =
new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB());
assertEquals(admin.topicPolicies().getRetention(topic, true), brokerPolicies);
RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20);
admin.namespaces().setRetention(myNamespace, namespacePolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getRetention(topic, true), namespacePolicies));
RetentionPolicies topicPolicies = new RetentionPolicies(20,30);
admin.topicPolicies().setRetention(topic, topicPolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getRetention(topic, true), topicPolicies));
admin.topicPolicies().removeRetention(topic);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getRetention(topic, true), namespacePolicies));
admin.namespaces().removeRetention(myNamespace);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getRetention(topic, true), brokerPolicies));
}
@Test(timeOut = 20000)
public void testGetSubDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic));
assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(conf.getDispatchThrottlingRatePerSubscriptionInMsg())
.dispatchThrottlingRateInByte(conf.getDispatchThrottlingRatePerSubscriptionInByte())
.ratePeriodInSecond(1)
.build();
assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(11)
.ratePeriodInSecond(12)
.build();
admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate);
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(20)
.dispatchThrottlingRateInByte(21)
.ratePeriodInSecond(12)
.build();
admin.topicPolicies().setSubscriptionDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, true), topicDispatchRate);
admin.namespaces().removeSubscriptionDispatchRate(myNamespace);
admin.topicPolicies().removeSubscriptionDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
admin.namespaces().setSubscriptionDispatchRate(myNamespaceV1, namespaceDispatchRate);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespaceV1)));
admin.namespaces().removeSubscriptionDispatchRate(myNamespaceV1);
Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespaceV1)));
}
@Test(timeOut = 20000)
public void testRetentionPriority() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getRetention(topic));
assertNull(admin.namespaces().getRetention(myNamespace));
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained");
shouldTopicBeRetained.setAccessible(true);
Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
lastActive.setAccessible(true);
//set last active to 2 minutes ago
lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2));
//the default value of the broker-level is 0, so it is not retained by default
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//set namespace-level policy
RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
admin.namespaces().setRetention(myNamespace, retentionPolicies);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// set topic-level policy
admin.topicPolicies().setRetention(topic, new RetentionPolicies(3, 1));
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topicPolicies().getRetention(topic)));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//topic-level disabled
admin.topicPolicies().setRetention(topic, new RetentionPolicies(0, 0));
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getRetention(topic).getRetentionSizeInMB(), 0));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove topic-level policy
admin.topicPolicies().removeRetention(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topicPolicies().getRetention(topic)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//namespace-level disabled
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(0, 0));
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//change namespace-level policy
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(1, 1));
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove namespace-level policy
admin.namespaces().removeRetention(myNamespace);
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getRetention(myNamespace)));
//the default value of the broker-level is 0, so it is not retained by default
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
}
@Test(timeOut = 20000)
public void testGetPersistenceApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getPersistence(topic));
assertNull(admin.namespaces().getPersistence(myNamespace));
PersistencePolicies brokerPolicy
= new PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(),
pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(),
pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(),
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
assertEquals(admin.topicPolicies().getPersistence(topic, true), brokerPolicy);
PersistencePolicies namespacePolicy
= new PersistencePolicies(5,4,3,2);
admin.namespaces().setPersistence(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getPersistence(myNamespace)));
assertEquals(admin.topicPolicies().getPersistence(topic, true), namespacePolicy);
PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1);
admin.topicPolicies().setPersistence(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getPersistence(topic)));
assertEquals(admin.topicPolicies().getPersistence(topic, true), topicPolicy);
admin.namespaces().removePersistence(myNamespace);
admin.topicPolicies().removePersistence(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getPersistence(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getPersistence(topic)));
assertEquals(admin.topicPolicies().getPersistence(topic, true), brokerPolicy);
}
@Test
public void testCheckPersistence() throws Exception {
PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
try {
admin.topicPolicies().setPersistence(testTopic, persistencePolicies);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 400);
}
persistencePolicies = new PersistencePolicies(2, 6, 2, 0.0);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
try {
admin.topicPolicies().setPersistence(testTopic, persistencePolicies);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 400);
}
persistencePolicies = new PersistencePolicies(2, 2, 6, 0.0);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
try {
admin.topicPolicies().setPersistence(testTopic, persistencePolicies);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 400);
}
persistencePolicies = new PersistencePolicies(1, 2, 2, 0.0);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
try {
admin.topicPolicies().setPersistence(testTopic, persistencePolicies);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 400);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testSetPersistence() throws Exception {
PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace));
PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);
admin.topics().createNonPartitionedTopic(persistenceTopic);
admin.topicPolicies().setPersistence(persistenceTopic, persistencePolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPersistence(persistenceTopic), persistencePolicies));
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(persistenceTopic)
.subscriptionName("test")
.subscribe();
admin.topics().unload(persistenceTopic);
Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get();
PersistentTopic persistentTopic = (PersistentTopic) t;
Awaitility.await().untilAsserted(() -> {
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
assertEquals(managedLedgerConfig.getEnsembleSize(), 3);
assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3);
assertEquals(managedLedgerConfig.getAckQuorumSize(), 3);
assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1);
});
PersistencePolicies getPersistencePolicies = admin.topicPolicies().getPersistence(persistenceTopic);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);
Assert.assertEquals(getPersistencePolicies, persistencePolicies);
consumer.close();
}
@Test
public void testGetDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getDispatchRate(topic));
assertNull(admin.namespaces().getDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(conf.getDispatchThrottlingRatePerTopicInMsg())
.dispatchThrottlingRateInByte(conf.getDispatchThrottlingRatePerTopicInByte())
.ratePeriodInSecond(1)
.build();
assertEquals(admin.topicPolicies().getDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(11)
.ratePeriodInSecond(12)
.build();
admin.namespaces().setDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDispatchRate(myNamespace)));
assertEquals(admin.topicPolicies().getDispatchRate(topic, true), namespaceDispatchRate);
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(20)
.dispatchThrottlingRateInByte(21)
.ratePeriodInSecond(22)
.build();
admin.topicPolicies().setDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getDispatchRate(topic)));
assertEquals(admin.topicPolicies().getDispatchRate(topic, true), topicDispatchRate);
admin.namespaces().removeDispatchRate(myNamespace);
admin.topicPolicies().removeDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getDispatchRate(topic)));
assertEquals(admin.topicPolicies().getDispatchRate(topic, true), brokerDispatchRate);
}
@Test
public void testRemovePersistence() throws Exception {
PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace));
PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);
admin.topics().createNonPartitionedTopic(persistenceTopic);
admin.topicPolicies().setPersistence(persistenceTopic, persistencePolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPersistence(persistenceTopic), persistencePolicies));
admin.topicPolicies().removePersistence(persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getPersistence(persistenceTopic)));
admin.lookups().lookupTopic(persistenceTopic);
Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get();
PersistentTopic persistentTopic = (PersistentTopic) t;
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
assertEquals(managedLedgerConfig.getEnsembleSize(), 2);
assertEquals(managedLedgerConfig.getWriteQuorumSize(), 2);
assertEquals(managedLedgerConfig.getAckQuorumSize(), 2);
assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.3);
}
@Test
public void testCheckMaxProducers() throws Exception {
int maxProducers = -1;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers, testTopic);
try {
admin.topicPolicies().setMaxProducers(testTopic, maxProducers);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetMaxProducerApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getMaxProducers(topic));
assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace));
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());
admin.namespaces().setMaxProducersPerTopic(myNamespace, 7);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)));
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), 7);
admin.topicPolicies().setMaxProducers(topic, 1000);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getMaxProducers(topic)));
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), 1000);
admin.namespaces().removeMaxProducersPerTopic(myNamespace);
admin.topicPolicies().removeMaxProducers(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getMaxProducers(topic)));
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());
}
private void waitTopicPoliciesApplied(String topic, int partitions,
java.util.function.Consumer<HierarchyTopicPolicies> condition) {
TopicName topicName = TopicName.get(topic);
if (partitions > 0) {
for (int i = 0; i < partitions; i++) {
String partition = topicName.getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(partition).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
} else {
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
}
@Test
public void testSetMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);
//broker level setting is 4
conf.setMaxProducersPerTopic(4);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
//ns level setting is 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
//topic level setting is 2
admin.topicPolicies().setMaxProducers(persistenceTopic, maxProducers);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 2);
});
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic),
maxProducers));
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer3 = null;
try {
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
Assert.assertNotNull(producer1);
Assert.assertNotNull(producer2);
Assert.assertNull(producer3);
admin.topicPolicies().removeMaxProducers(persistenceTopic);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer4;
try {
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
admin.namespaces().removeMaxProducersPerTopic(myNamespace);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
try {
Producer<byte[]> producer5 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
producer1.close();
producer2.close();
producer3.close();
producer4.close();
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
admin.topicPolicies().setMaxProducers(persistenceTopic, maxProducers);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic), maxProducers));
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer3 = null;
Producer<byte[]> producer4 = null;
try {
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
Assert.assertNotNull(producer1);
Assert.assertNotNull(producer2);
Assert.assertNull(producer3);
admin.topicPolicies().removeMaxProducers(persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getMaxProducers(persistenceTopic)));
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.assertNotNull(producer3);
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(), 3));
log.info("MaxProducers: {} will set to the namespace: {}", 3, myNamespace);
try {
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on namespace level.");
}
Assert.assertNull(producer4);
producer1.close();
producer2.close();
producer3.close();
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetSetDispatchRate() throws Exception {
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(100)
.dispatchThrottlingRateInByte(1000)
.ratePeriodInSecond(1)
.relativeToPublishRate(true)
.build();
log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
admin.topicPolicies().setDispatchRate(testTopic, dispatchRate);
log.info("Dispatch Rate set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getDispatchRate(testTopic), dispatchRate));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveDispatchRate() throws Exception {
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(100)
.dispatchThrottlingRateInByte(1000)
.ratePeriodInSecond(1)
.relativeToPublishRate(true)
.build();
log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
admin.topicPolicies().setDispatchRate(testTopic, dispatchRate);
log.info("Dispatch Rate set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getDispatchRate(testTopic), dispatchRate));
admin.topicPolicies().removeDispatchRate(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDispatchRate(testTopic)));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test(timeOut = 20000)
public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(200)
.dispatchThrottlingRateInByte(20000)
.ratePeriodInSecond(1)
.relativeToPublishRate(true)
.build();
admin.namespaces().setDispatchRate(myNamespace, dispatchRate);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getDispatchRate(myNamespace).getDispatchThrottlingRateInMsg(), 200));
dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(100)
.dispatchThrottlingRateInByte(10000)
.ratePeriodInSecond(1)
.relativeToPublishRate(true)
.build();
admin.topicPolicies().setDispatchRate(topic, dispatchRate);
Awaitility.await()
.untilAsserted(() -> Assert.assertNotNull(admin.topicPolicies().getDispatchRate(topic)));
//1 Set ns level policy, topic level should not be overwritten
dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(300)
.dispatchThrottlingRateInByte(30000)
.ratePeriodInSecond(2)
.relativeToPublishRate(true)
.build();
admin.namespaces().setDispatchRate(myNamespace, dispatchRate);
Awaitility.await()
.untilAsserted(() -> {
DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
Assert.assertEquals(limiter.getDispatchRateOnByte(), 10000);
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100);
});
admin.topicPolicies().removeDispatchRate(topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDispatchRate(topic)));
//2 Remove level policy, DispatchRateLimiter should us ns level policy
Awaitility.await()
.untilAsserted(() -> {
DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000);
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300);
});
}
@Test(timeOut = 20000)
public void testRestart() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
// set namespace level inactive topic policies
InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up));
// set namespace retention policies
final RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
admin.namespaces().setRetention(myNamespace, retentionPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(myNamespace),
retentionPolicies));
// set topic level inactive topic policies
inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic),
finalInactiveTopicPolicies));
// set topic level retention policies
final RetentionPolicies finalRetentionPolicies = new RetentionPolicies(20, -1);
admin.topicPolicies().setRetention(topic, finalRetentionPolicies);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(topic), finalRetentionPolicies));
// restart broker, policy should still take effect
restartBroker();
// Trigger the cache init.
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
// check inactive topic policies and retention policies.
Awaitility.await()
.untilAsserted(() -> {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies);
Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(),
finalRetentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(finalRetentionPolicies.getRetentionTimeInMinutes()));
});
producer.close();
}
@Test
public void testGetSetSubscriptionDispatchRate() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1024 * 1024)
.ratePeriodInSecond(1)
.build();
log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
admin.topicPolicies().setSubscriptionDispatchRate(topic, dispatchRate);
log.info("Subscription dispatch rate set success on topic: {}", topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic), dispatchRate));
String subscriptionName = "test_subscription_rate";
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotNull(dispatchRateLimiter);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInMsg());
consumer.close();
admin.topics().delete(topic, true);
}
@Test
public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1024 * 1024)
.ratePeriodInSecond(1)
.build();
log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
String subscriptionName = "test_subscription_rate";
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
admin.topicPolicies().setSubscriptionDispatchRate(topic, dispatchRate);
log.info("Subscription dispatch rate set success on topic: {}", topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic), dispatchRate));
DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotNull(dispatchRateLimiter);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInMsg());
consumer.close();
admin.topics().delete(topic, true);
}
@Test
public void testRemoveSubscriptionDispatchRate() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1024 * 1024)
.ratePeriodInSecond(1)
.build();
log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
admin.topicPolicies().setSubscriptionDispatchRate(topic, dispatchRate);
log.info("Subscription dispatch rate set success on topic: {}", topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic), dispatchRate));
String subscriptionName = "test_subscription_rate";
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotNull(dispatchRateLimiter);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInMsg());
// remove subscription dispatch rate
admin.topicPolicies().removeSubscriptionDispatchRate(topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic)));
Awaitility.await().untilAsserted(() -> {
DispatchRateLimiter drl = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotEquals(drl.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInMsg());
Assert.assertNotEquals(drl.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
});
consumer.close();
admin.topics().delete(topic, true);
}
@Test
public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
// set namespace level subscription dispatch rate
DispatchRate namespaceDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(100)
.dispatchThrottlingRateInByte(1024 * 1024)
.ratePeriodInSecond(1)
.build();
admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(myNamespace), namespaceDispatchRate));
String subscriptionName = "test_subscription_rate";
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
// get subscription dispatch Rate limiter
DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.getDispatchThrottlingRateInMsg());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.getDispatchThrottlingRateInByte());
// set topic level subscription dispatch rate
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(200)
.dispatchThrottlingRateInByte(2 * 1024 * 1024)
.ratePeriodInSecond(1)
.build();
admin.topicPolicies().setSubscriptionDispatchRate(topic, topicDispatchRate);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscriptionDispatchRate(topic), topicDispatchRate));
// get subscription dispatch rate limiter
dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
.getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.getDispatchThrottlingRateInByte());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.getDispatchThrottlingRateInMsg());
// remove topic level subscription dispatch rate limiter
admin.topicPolicies().removeSubscriptionDispatchRate(topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getSubscriptionDispatchRate(topic)));
// get subscription dispatch rate limiter
dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
.getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.getDispatchThrottlingRateInByte());
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.getDispatchThrottlingRateInMsg());
consumer.close();
admin.topics().delete(topic, true);
}
@Test
public void testGetSetCompactionThreshold() throws Exception {
Long compactionThreshold = 100000L;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
admin.topicPolicies().setCompactionThreshold(testTopic, compactionThreshold);
log.info("Compaction threshold set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getCompactionThreshold(testTopic), compactionThreshold));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveCompactionThreshold() throws Exception {
Long compactionThreshold = 100000L;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
admin.topicPolicies().setCompactionThreshold(testTopic, compactionThreshold);
log.info("Compaction threshold set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getCompactionThreshold(testTopic), compactionThreshold));
admin.topicPolicies().removeCompactionThreshold(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getCompactionThreshold(testTopic)));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetSetMaxConsumersPerSubscription() throws Exception {
Integer maxConsumersPerSubscription = 10;
log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
admin.topicPolicies().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveMaxConsumersPerSubscription() throws Exception {
Integer maxConsumersPerSubscription = 10;
log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
admin.topicPolicies().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription));
admin.topicPolicies().removeMaxConsumersPerSubscription(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getMaxConsumersPerSubscription(testTopic)));
admin.namespaces().setMaxConsumersPerSubscription(myNamespaceV1, 20);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespaceV1).intValue(), 20));
admin.namespaces().removeMaxConsumersPerSubscription(myNamespaceV1);
Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespaceV1)));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetSetPublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
admin.topicPolicies().setPublishRate(testTopic, publishRate);
log.info("Publish Rate set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(testTopic), publishRate));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemovePublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
admin.topicPolicies().setPublishRate(testTopic, publishRate);
log.info("Publish Rate set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getPublishRate(testTopic), publishRate));
admin.topicPolicies().removePublishRate(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getPublishRate(testTopic)));
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testCheckMaxConsumers() throws Exception {
Integer maxProducers = -1;
log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic);
try {
admin.topicPolicies().setMaxConsumers(testTopic, maxProducers);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetMaxConsumersApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getMaxConsumers(topic));
assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
assertEquals(admin.topicPolicies().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic());
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 7);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)));
assertEquals(admin.topicPolicies().getMaxConsumers(topic, true).intValue(), 7);
admin.topicPolicies().setMaxConsumers(topic, 1000);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getMaxConsumers(topic)));
assertEquals(admin.topicPolicies().getMaxConsumers(topic, true).intValue(), 1000);
admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
admin.topicPolicies().removeMaxConsumers(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getMaxConsumers(topic)));
assertEquals(admin.topicPolicies().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic());
}
@Test
public void testSetMaxConsumers() throws Exception {
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxConsumersPerTopic(myNamespace).intValue(), 1));
log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
Integer maxConsumers = 2;
log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
admin.topicPolicies().setMaxConsumers(persistenceTopic, maxConsumers);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxConsumers(persistenceTopic), maxConsumers));
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
Consumer<byte[]> consumer3 = null;
try {
consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max consumers limit");
}
Assert.assertNotNull(consumer1);
Assert.assertNotNull(consumer2);
Assert.assertNull(consumer3);
consumer1.close();
consumer2.close();
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemoveMaxConsumers() throws Exception {
Integer maxConsumers = 2;
admin.topics().createPartitionedTopic(persistenceTopic, 2);
admin.topicPolicies().setMaxConsumers(persistenceTopic, maxConsumers);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxConsumers(persistenceTopic), maxConsumers));
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
Consumer<byte[]> consumer3 = null;
try {
consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max consumers limit");
}
Assert.assertNotNull(consumer1);
Assert.assertNotNull(consumer2);
Assert.assertNull(consumer3);
admin.topicPolicies().removeMaxConsumers(persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getMaxConsumers(persistenceTopic)));
consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
Assert.assertNotNull(consumer3);
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
log.info("MaxConsumers: {} will set to the namespace: {}", 3, myNamespace);
Consumer<byte[]> consumer4 = null;
try {
consumer4 = pulsarClient.newConsumer().subscriptionName("sub4").topic(persistenceTopic).subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max consumers limit on namespace level.");
}
Assert.assertNull(consumer4);
consumer1.close();
consumer2.close();
consumer3.close();
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testDisableSubscribeRate() throws Exception {
assertEquals(pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(), 0);
admin.topics().createNonPartitionedTopic(persistenceTopic);
admin.lookups().lookupTopic(persistenceTopic);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(persistenceTopic).get().get();
Field field = PersistentTopic.class.getDeclaredField("subscribeRateLimiter");
field.setAccessible(true);
Optional<SubscribeRateLimiter> limiter = (Optional<SubscribeRateLimiter>) field.get(topic);
// sub rate limiter should be null by default
assertFalse(limiter.isPresent());
// Enable / Disable subscribe rate in namespace-level
final SubscribeRate subscribeRate = new SubscribeRate(1, 100);
admin.namespaces().setSubscribeRate(myNamespace, subscribeRate);
Awaitility.await().untilAsserted(()-> {
Optional<SubscribeRateLimiter> limiter1 = (Optional<SubscribeRateLimiter>) field.get(topic);
assertTrue(limiter1.isPresent());
});
admin.namespaces().removeSubscribeRate(myNamespace);
Awaitility.await().untilAsserted(()-> {
Optional<SubscribeRateLimiter> limiter2 = (Optional<SubscribeRateLimiter>) field.get(topic);
assertFalse(limiter2.isPresent());
});
// Enable / Disable subscribe rate in topic-level
admin.topicPolicies().setSubscribeRate(persistenceTopic, subscribeRate);
Awaitility.await().untilAsserted(()-> {
Optional<SubscribeRateLimiter> limiter1 = (Optional<SubscribeRateLimiter>) field.get(topic);
assertTrue(limiter1.isPresent());
});
admin.topicPolicies().removeSubscribeRate(persistenceTopic);
Awaitility.await().untilAsserted(()-> {
Optional<SubscribeRateLimiter> limiter2 = (Optional<SubscribeRateLimiter>) field.get(topic);
assertFalse(limiter2.isPresent());
});
}
@Test
public void testGetSetSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
SubscribeRate subscribeRate1 = new SubscribeRate(1, 30);
log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate1, myNamespace);
admin.namespaces().setSubscribeRate(myNamespace, subscribeRate1);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscribeRate(myNamespace), subscribeRate1));
SubscribeRate subscribeRate2 = new SubscribeRate(2, 30);
log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate2, persistenceTopic);
admin.topicPolicies().setSubscribeRate(persistenceTopic, subscribeRate2);
log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscribeRate(persistenceTopic), subscribeRate2));
PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
Consumer consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer1);
consumer1.close();
pulsarClient1.shutdown();
Consumer consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer2);
consumer2.close();
pulsarClient2.shutdown();
Consumer consumer3 = null;
try {
consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("subscribe rate reached max subscribe rate limit");
}
Assert.assertNull(consumer3);
pulsarClient3.shutdown();
admin.topics().deletePartitionedTopic(testTopic, true);
admin.topics().deletePartitionedTopic(persistenceTopic, true);
}
@Test(timeOut = 20000)
public void testGetSubscribeRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getSubscribeRate(topic));
assertNull(admin.namespaces().getSubscribeRate(myNamespace));
SubscribeRate brokerPolicy = new SubscribeRate(
pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
);
assertEquals(admin.topicPolicies().getSubscribeRate(topic, true), brokerPolicy);
SubscribeRate namespacePolicy = new SubscribeRate(10, 11);
admin.namespaces().setSubscribeRate(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscribeRate(myNamespace)));
assertEquals(admin.topicPolicies().getSubscribeRate(topic, true), namespacePolicy);
SubscribeRate topicPolicy = new SubscribeRate(20, 21);
admin.topicPolicies().setSubscribeRate(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getSubscribeRate(topic)));
assertEquals(admin.topicPolicies().getSubscribeRate(topic, true), topicPolicy);
admin.namespaces().removeSubscribeRate(myNamespace);
admin.topicPolicies().removeSubscribeRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscribeRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getSubscribeRate(topic)));
assertEquals(admin.topicPolicies().getSubscribeRate(topic, true), brokerPolicy);
}
@Test(timeOut = 30000)
public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
final String topic = testTopic + UUID.randomUUID();
int maxConsumerInBroker = 1;
int maxConsumerInNs = 2;
int maxConsumerInTopic = 4;
String mySub = "my-sub";
conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
pulsarClient.newProducer().topic(topic).create().close();
List<Consumer<String>> consumerList = new ArrayList<>();
ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.topic(topic).subscriptionName(mySub);
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
//disabled
admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0));
consumerList.add(builder.subscribe());
//set topic-level
admin.topicPolicies().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topicPolicies().getMaxConsumersPerSubscription(topic)));
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
//remove topic policies
admin.topicPolicies().removeMaxConsumersPerSubscription(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin.topicPolicies().getMaxConsumersPerSubscription(topic)));
consumerList.add(builder.subscribe());
//remove namespace policies, then use broker-level
admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
for (Consumer<String> consumer : consumerList) {
consumer.close();
}
}
@Test
public void testRemoveSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
pulsarClient.newProducer().topic(persistenceTopic).create().close();
SubscribeRate subscribeRate = new SubscribeRate(2, 30);
log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
admin.topicPolicies().setSubscribeRate(persistenceTopic, subscribeRate);
log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getSubscribeRate(persistenceTopic), subscribeRate));
PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer1);
consumer1.close();
pulsarClient1.shutdown();
Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer2);
consumer2.close();
pulsarClient2.shutdown();
Consumer<byte[]> consumer3 = null;
try {
consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("subscribe rate reached max subscribe rate limit");
}
Assert.assertNull(consumer3);
admin.topicPolicies().removeSubscribeRate(persistenceTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getSubscribeRate(persistenceTopic)));
admin.topics().unload(persistenceTopic);
PulsarClient pulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient5 = newPulsarClient(lookupUrl.toString(), 0);
PulsarClient pulsarClient6 = newPulsarClient(lookupUrl.toString(), 0);
consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer3);
consumer3.close();
pulsarClient3.shutdown();
Consumer<byte[]> consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer4);
consumer4.close();
pulsarClient4.shutdown();
Consumer<byte[]> consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer5);
consumer5.close();
pulsarClient5.shutdown();
Consumer<byte[]> consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2")
.topic(persistenceTopic).consumerName("test").subscribe();
Assert.assertNotNull(consumer6);
consumer6.close();
pulsarClient6.shutdown();
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testPublishRateInDifferentLevelPolicy() throws Exception {
cleanup();
conf.setMaxPublishRatePerTopicInMessages(5);
conf.setMaxPublishRatePerTopicInBytes(50L);
setup();
final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
pulsarClient.newProducer().topic(topicName).create().close();
Field publishMaxMessageRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate");
publishMaxMessageRate.setAccessible(true);
Field publishMaxByteRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxByteRate");
publishMaxByteRate.setAccessible(true);
//1 use broker-level policy by default
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
PublishRateLimiterImpl publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
//2 set namespace-level policy
PublishRate publishMsgRate = new PublishRate(10, 100L);
admin.namespaces().setPublishRate(myNamespace, publishMsgRate);
Awaitility.await()
.until(() -> {
PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
return (int)publishMaxMessageRate.get(limiter) == 10;
});
publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L);
//3 set topic-level policy, namespace-level policy should be overwritten
PublishRate publishMsgRate2 = new PublishRate(11, 101L);
admin.topicPolicies().setPublishRate(topicName, publishMsgRate2);
Awaitility.await()
.until(() -> admin.topicPolicies().getPublishRate(topicName) != null);
publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 11);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 101L);
//4 remove topic-level policy, namespace-level policy will take effect
admin.topicPolicies().removePublishRate(topicName);
Awaitility.await()
.until(() -> admin.topicPolicies().getPublishRate(topicName) == null);
publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L);
//5 remove namespace-level policy, broker-level policy will take effect
admin.namespaces().removePublishRate(myNamespace);
Awaitility.await()
.until(() -> {
PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
return (int)publishMaxMessageRate.get(limiter) == 5;
});
publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
}
@Test(timeOut = 20000)
public void testTopicMaxMessageSizeApi() throws Exception{
admin.topics().createNonPartitionedTopic(persistenceTopic);
assertNull(admin.topicPolicies().getMaxMessageSize(persistenceTopic));
admin.topicPolicies().setMaxMessageSize(persistenceTopic,10);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null);
assertEquals(admin.topicPolicies().getMaxMessageSize(persistenceTopic).intValue(),10);
admin.topicPolicies().removeMaxMessageSize(persistenceTopic);
assertNull(admin.topicPolicies().getMaxMessageSize(persistenceTopic));
try {
admin.topicPolicies().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(),412);
}
try {
admin.topicPolicies().setMaxMessageSize(persistenceTopic, -1);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(),412);
}
}
@DataProvider(name = "persistentAndPartition")
public Object[][] implementations() {
return new Object[][]{
{TopicDomain.persistent, true},
{TopicDomain.persistent, false},
{TopicDomain.non_persistent, true},
{TopicDomain.non_persistent, false},
};
}
@Test(dataProvider = "persistentAndPartition")
public void testTopicMaxMessageSize(TopicDomain topicDomain, boolean isPartitioned) throws Exception {
final String topic = TopicName.get(
topicDomain.value(),
NamespaceName.get(myNamespace),
"test-" + UUID.randomUUID()
).toString();
if (isPartitioned) {
admin.topics().createPartitionedTopic(topic, 3);
}
// init cache
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
assertNull(admin.topicPolicies().getMaxMessageSize(topic));
// set msg size
admin.topicPolicies().setMaxMessageSize(topic, 10);
Awaitility.await().until(() -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
if (isPartitioned) {
for (int i = 0; i <3; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
AbstractTopic partition =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(partitionName).get().get();
assertEquals(partition.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(),
Integer.valueOf(10));
});
}
} else {
Awaitility.await().untilAsserted(() -> {
AbstractTopic abstractTopic =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertEquals(abstractTopic.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(),
Integer.valueOf(10));
});
}
assertEquals(admin.topicPolicies().getMaxMessageSize(topic).intValue(), 10);
try {
producer.send(new byte[1024]);
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.NotAllowedException);
}
admin.topicPolicies().removeMaxMessageSize(topic);
assertNull(admin.topicPolicies().getMaxMessageSize(topic));
try {
admin.topicPolicies().setMaxMessageSize(topic, Integer.MAX_VALUE);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}
try {
admin.topicPolicies().setMaxMessageSize(topic, -1);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}
//make sure policy value take effect.
if (isPartitioned) {
for (int i = 0; i <3; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
AbstractTopic partition =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(partitionName).get().get();
assertNull(partition.getHierarchyTopicPolicies().getTopicMaxMessageSize().getTopicValue());
});
}
} else {
Awaitility.await().untilAsserted(() -> {
AbstractTopic abstractTopic =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertNull(abstractTopic.getHierarchyTopicPolicies().getTopicMaxMessageSize().getTopicValue());
});
}
Awaitility.await().untilAsserted(() -> {
try {
MessageId messageId = producer.send(new byte[1024]);
assertNotNull(messageId);
} catch (PulsarClientException e) {
fail("failed to send message");
}
});
producer.close();
}
@Test(timeOut = 20000)
public void testMaxSubscriptionsFailFast() throws Exception {
doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable);
doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable);
}
private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
int maxSubInNamespace = 2;
List<Consumer> consumers = new ArrayList<>();
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().subscriptionMode(subMode)
.subscriptionType(SubscriptionType.Shared).topic(topic);
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSubInNamespace);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)));
for (int i = 0; i < maxSubInNamespace; i++) {
consumers.add(consumerBuilder.subscriptionName("sub" + i).subscribe());
}
long start = System.currentTimeMillis();
try {
consumerBuilder.subscriptionName("sub").subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.NotAllowedException);
}
//fail fast
assertTrue(System.currentTimeMillis() - start < 3000);
//clean
for (Consumer consumer : consumers) {
consumer.close();
}
}
@Test(timeOut = 20000)
public void testMaxSubscriptionsPerTopicApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getMaxSubscriptionsPerTopic(topic));
// set max subscriptions
admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, 10);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
assertEquals(admin.topicPolicies().getMaxSubscriptionsPerTopic(topic).intValue(), 10);
// remove max subscriptions
admin.topicPolicies().removeMaxSubscriptionsPerTopic(topic);
assertNull(admin.topicPolicies().getMaxSubscriptionsPerTopic(topic));
// set invalidate value
try {
admin.topicPolicies().setMaxMessageSize(topic, -1);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}
}
@Test(timeOut = 20000)
public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
// Set topic-level max subscriptions
final int topicLevelMaxSubNum = 2;
admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
List<Consumer<String>> consumerList = new ArrayList<>();
String subName = "my-sub-";
for (int i = 0; i < topicLevelMaxSubNum; i++) {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName + i)
.topic(topic).subscribe();
consumerList.add(consumer);
}
// should fail
try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build()) {
consumerList.add(client.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe());
fail("should fail");
} catch (PulsarClientException ignore) {
assertEquals(consumerList.size(), topicLevelMaxSubNum);
}
//create a consumer with the same subscription name, it should succeed
pulsarClient.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName + "0")
.topic(topic).subscribe().close();
//Clean up
for (Consumer<String> c : consumerList) {
c.close();
}
}
@Test
public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
cleanup();
conf.setMaxUnackedMessagesPerSubscription(30);
setup();
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
@Cleanup
Producer producer = pulsarClient.newProducer().topic(topic).create();
//default value is null
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace));
int msgNum = 100;
int maxUnackedMsgOnTopic = 10;
int maxUnackedMsgNumOnNamespace = 5;
int defaultMaxUnackedMsgOnBroker = pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
produceMsg(producer, msgNum);
//set namespace-level policy, the restriction should take effect
admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), maxUnackedMsgNumOnNamespace));
List<Message<?>> messages;
String subName = "sub-" + UUID.randomUUID();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName).receiverQueueSize(1)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared);
@Cleanup
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
messages = getMsgReceived(consumer1, msgNum);
assertEquals(messages.size(), maxUnackedMsgNumOnNamespace);
ackMessages(consumer1, messages);
//disable namespace-level policy, should unlimited
admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, 0);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), 0));
messages = getMsgReceived(consumer1, 40);
assertEquals(messages.size(), 40);
ackMessages(consumer1, messages);
//set topic-level and namespace-level policy, topic-level should has higher priority
admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue()
, maxUnackedMsgNumOnNamespace));
admin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, maxUnackedMsgOnTopic);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)));
//check the value applied
PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertEquals(persistentTopic.getMaxUnackedMessagesOnSubscription(), maxUnackedMsgOnTopic);
messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
assertEquals(messages.size(), maxUnackedMsgOnTopic);
ackMessages(consumer1, messages);
//remove both namespace-level and topic-level policy, broker-level should take effect
admin.namespaces().removeMaxUnackedMessagesPerSubscription(myNamespace);
admin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
Awaitility.await().until(()
-> admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace) == null
&& admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic) == null);
messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker);
}
private void produceMsg(Producer producer, int msgNum) throws Exception{
for (int i = 0; i < msgNum; i++) {
producer.send("msg".getBytes());
}
}
private List<Message<?>> getMsgReceived(Consumer<byte[]> consumer1, int msgNum) throws PulsarClientException {
List<Message<?>> messages = new ArrayList<>();
for (int i = 0; i < msgNum; i++) {
Message<?> message = consumer1.receive(1000, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
messages.add(message);
}
return messages;
}
private void ackMessages(Consumer<?> consumer, List<Message<?>> messages) throws Exception{
for (Message<?> message : messages) {
consumer.acknowledge(message);
}
}
@Test(timeOut = 20000)
public void testMaxSubscriptionsPerTopic() throws Exception {
int brokerLevelMaxSub = 4;
conf.setMaxSubscriptionsPerTopic(4);
restartBroker();
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
// Set topic-level max subscriptions
final int topicLevelMaxSubNum = 2;
admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
List<Consumer<String>> consumerList = new ArrayList<>();
for (int i = 0; i < topicLevelMaxSubNum; i++) {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe();
consumerList.add(consumer);
}
try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS)
.serviceUrl(brokerUrl.toString()).build()) {
consumerList.add(client.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe());
fail("should fail");
} catch (PulsarClientException ignore) {
assertEquals(consumerList.size(), topicLevelMaxSubNum);
}
// Set namespace-level policy, but will not take effect
final int namespaceLevelMaxSub = 3;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, namespaceLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Awaitility.await().until(() -> Integer.valueOf(namespaceLevelMaxSub)
.equals(persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().getNamespaceValue()));
try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
.serviceUrl(brokerUrl.toString()).build()) {
client.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException ignore) {
assertEquals(consumerList.size(), topicLevelMaxSubNum);
}
//Removed topic-level policy, namespace-level should take effect
admin.topicPolicies().removeMaxSubscriptionsPerTopic(topic);
consumerList.add(pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
assertEquals(consumerList.size(), namespaceLevelMaxSub);
try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
.serviceUrl(brokerUrl.toString()).build()) {
consumerList.add(client.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe());
fail("should fail");
} catch (PulsarClientException ignore) {
assertEquals(consumerList.size(), namespaceLevelMaxSub);
}
//Removed namespace-level policy, broker-level should take effect
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> persistentTopic
.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic()
.getNamespaceValue() == null);
consumerList.add(pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
assertEquals(consumerList.size(), brokerLevelMaxSub);
try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
.serviceUrl(brokerUrl.toString()).build()) {
consumerList.add(client.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString())
.topic(topic).subscribe());
fail("should fail");
} catch (PulsarClientException ignore) {
assertEquals(consumerList.size(), brokerLevelMaxSub);
}
//Clean up
for (Consumer<String> c : consumerList) {
c.close();
}
}
@Test(timeOut = 30000)
public void testReplicatorRateApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getReplicatorDispatchRate(topic));
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(100)
.dispatchThrottlingRateInByte(200)
.ratePeriodInSecond(10)
.build();
admin.topicPolicies().setReplicatorDispatchRate(topic, dispatchRate);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate));
admin.topicPolicies().removeReplicatorDispatchRate(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topicPolicies().getReplicatorDispatchRate(topic)));
}
@Test(timeOut = 20000)
public void testGetReplicatorRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getReplicatorDispatchRate(topic));
assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg())
.dispatchThrottlingRateInByte(pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte())
.ratePeriodInSecond(1)
.build();
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(11)
.ratePeriodInSecond(12)
.build();
admin.namespaces().setReplicatorDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic, true), namespaceDispatchRate);
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(20)
.dispatchThrottlingRateInByte(21)
.ratePeriodInSecond(22)
.build();
admin.topicPolicies().setReplicatorDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getReplicatorDispatchRate(topic)));
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic, true), topicDispatchRate);
admin.namespaces().removeReplicatorDispatchRate(myNamespace);
admin.topicPolicies().removeReplicatorDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getReplicatorDispatchRate(topic)));
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
}
@Test(timeOut = 30000)
public void testAutoCreationDisabled() throws Exception {
cleanup();
conf.setAllowAutoTopicCreation(false);
setup();
final String topic = testTopic + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
pulsarClient.newProducer().topic(topic).create().close();
//should not fail
assertNull(admin.topicPolicies().getMessageTTL(topic));
}
@Test
public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 1);
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
subscriptionTypeSet.add(SubscriptionType.Key_Shared);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getSubscriptionTypesEnabled(topic)));
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(TopicName.get(topic).getPartition(0).toString()).get();
Set<String> old = new HashSet<>(pulsar.getConfiguration().getSubscriptionTypesEnabled());
try {
pulsar.getConfiguration().getSubscriptionTypesEnabled().clear();
assertTrue(persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared));
} finally {
//restore
pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
}
Awaitility.await().untilAsserted(() ->
assertFalse(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
admin.topics().removeSubscriptionTypesEnabled(topic);
Awaitility.await().untilAsserted(() ->
assertTrue(admin.topics().getSubscriptionTypesEnabled(topic).isEmpty()));
}
@Test(timeOut = 30000)
public void testSubscriptionTypesEnabled() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// use broker.conf
pulsar.getConfiguration().setSubscriptionTypesEnabled(Sets.newHashSet("Exclusive"));
admin.topics().createNonPartitionedTopic(topic);
try {
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("test").subscribe();
fail();
} catch (PulsarClientException pulsarClientException) {
assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
}
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test")
.subscribe().close();
//update broker level dynamic update config
admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled", "Shared");
Awaitility.await().untilAsserted(()->{
assertTrue(pulsar.getConfiguration().getSubscriptionTypesEnabled().contains("Shared"));
});
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("test").subscribe().close();
assertNull(admin.topicPolicies().getSubscriptionTypesEnabled(topic));
// set enable failover sub type
Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
subscriptionTypeSet.add(SubscriptionType.Failover);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().get()
.contains(CommandSubscribe.SubType.Failover));
});
subscriptionTypeSet = admin.topicPolicies().getSubscriptionTypesEnabled(topic);
assertTrue(subscriptionTypeSet.contains(SubscriptionType.Failover));
assertFalse(subscriptionTypeSet.contains(SubscriptionType.Shared));
assertEquals(subscriptionTypeSet.size(), 1);
try {
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
fail();
} catch (PulsarClientException pulsarClientException) {
assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
}
// add shared type
subscriptionTypeSet.add(SubscriptionType.Shared);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().get()
.contains(CommandSubscribe.SubType.Shared));
});
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();
// test namespace and topic policy
subscriptionTypeSet.add(SubscriptionType.Shared);
admin.namespaces().setSubscriptionTypesEnabled(myNamespace, subscriptionTypeSet);
subscriptionTypeSet.clear();
subscriptionTypeSet.add(SubscriptionType.Failover);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().getTopicValue()
.contains(CommandSubscribe.SubType.Failover));
});
try {
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
fail();
} catch (PulsarClientException pulsarClientException) {
assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
}
//clear topic level setting, use ns setting only, which only contains shared.
admin.topicPolicies().setSubscriptionTypesEnabled(topic, Collections.emptySet());
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertNull(hierarchyTopicPolicies.getSubscriptionTypesEnabled().getTopicValue());
});
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();
}
@Test(timeOut = 20000)
public void testNonPersistentMaxConsumerOnSub() throws Exception {
int maxConsumerPerSubInBroker = 1;
int maxConsumerPerSubInNs = 2;
int maxConsumerPerSubInTopic = 3;
conf.setMaxConsumersPerSubscription(maxConsumerPerSubInBroker);
final String topic = "non-persistent://" + myNamespace + "/test-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
Producer producer = pulsarClient.newProducer().topic(topic).create();
final String subName = "my-sub";
ConsumerBuilder builder = pulsarClient.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName).topic(topic);
Consumer consumer = builder.subscribe();
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("reached max consumers limit"));
}
// set namespace policy
admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerPerSubInNs);
Awaitility.await().untilAsserted(() -> {
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace));
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), maxConsumerPerSubInNs);
});
Consumer consumer2 = builder.subscribe();
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("reached max consumers limit"));
}
//set topic policy
admin.topicPolicies().setMaxConsumersPerSubscription(topic, maxConsumerPerSubInTopic);
Awaitility.await().untilAsserted(() -> {
assertNotNull(admin.topicPolicies().getMaxConsumersPerSubscription(topic));
assertEquals(admin.topicPolicies().getMaxConsumersPerSubscription(topic).intValue(), maxConsumerPerSubInTopic);
});
Consumer consumer3 = builder.subscribe();
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("reached max consumers limit"));
}
consumer.close();
consumer2.close();
consumer3.close();
producer.close();
}
@Test(timeOut = 20000)
public void testGetCompactionThresholdApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topicPolicies().getCompactionThreshold(topic));
assertNull(admin.namespaces().getCompactionThreshold(myNamespace));
long brokerPolicy = pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
assertEquals(admin.topicPolicies().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
long namespacePolicy = 10L;
admin.namespaces().setCompactionThreshold(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getCompactionThreshold(myNamespace)));
assertEquals(admin.topicPolicies().getCompactionThreshold(topic, true).longValue(), namespacePolicy);
long topicPolicy = 20L;
admin.topicPolicies().setCompactionThreshold(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topicPolicies().getCompactionThreshold(topic)));
assertEquals(admin.topicPolicies().getCompactionThreshold(topic, true).longValue(), topicPolicy);
admin.namespaces().removeCompactionThreshold(myNamespace);
admin.topicPolicies().removeCompactionThreshold(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topicPolicies().getCompactionThreshold(topic)));
assertEquals(admin.topicPolicies().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
}
@Test(timeOut = 30000)
public void testProduceConsumeOnTopicPolicy() {
final String msg = "send message ";
int numMsg = 10;
try {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(persistenceTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test").subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(persistenceTopic).create();
for (int i = 0; i < numMsg; ++i) {
producer.newMessage().value(msg + i).send();
}
for (int i = 0; i < numMsg; ++i) {
Message<String> message = consumer.receive(100, TimeUnit.MILLISECONDS);
Assert.assertEquals(message.getValue(), msg + i);
}
} catch (PulsarClientException e) {
log.error("Failed to send/produce message, ", e);
Assert.fail();
}
}
@Test(timeOut = 30000)
public void testSystemTopicShouldBeCompacted() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
admin.topicPolicies().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
log.info("Backlog quota set success on topic: {}", testTopic);
Awaitility.await()
.untilAsserted(() -> {
TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic);
Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction"));
});
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicPolicyEventsTopic);
long previousCompactedLedgerId = internalStats.compactedLedger.ledgerId;
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
pulsar.getBrokerService().checkCompaction();
Awaitility.await()
.untilAsserted(() -> {
PersistentTopicInternalStats iStats = admin.topics().getInternalStats(topicPolicyEventsTopic);
Assert.assertTrue(iStats.compactedLedger.ledgerId != previousCompactedLedgerId);
});
}
@Test(timeOut = 30000)
public void testTopicRetentionPolicySetInManagedLedgerConfig() throws Exception {
RetentionPolicies nsRetentionPolicies = new RetentionPolicies(1, -1);
TopicName topicName = TopicName.get(testTopic);
// set and check retention policy on namespace level
admin.namespaces().setRetention(myNamespace, nsRetentionPolicies);
ManagedLedgerConfig managedLedgerConfig = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(nsRetentionPolicies.getRetentionTimeInMinutes()));
Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(), nsRetentionPolicies.getRetentionSizeInMB());
// set and check retention policy on topic level
RetentionPolicies topicRetentionPolicies = new RetentionPolicies(2, -1);
admin.topicPolicies().setRetention(testTopic, topicRetentionPolicies);
Awaitility.await().untilAsserted(() -> {
ManagedLedgerConfig config = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
Assert.assertEquals(config.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(topicRetentionPolicies.getRetentionTimeInMinutes()));
Assert.assertEquals(config.getRetentionSizeInMB(), topicRetentionPolicies.getRetentionSizeInMB());
});
}
@Test
public void testPolicyIsDeleteTogetherManually() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
int maxConsumersPerSubscription = 10;
admin.topicPolicies().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNotNull());
admin.topics().delete(topic);
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
}
@Test
public void testPoliciesCanBeDeletedWithTopic() throws Exception {
final String topic = testTopic + UUID.randomUUID();
final String topic2 = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
pulsarClient.newProducer().topic(topic2).create().close();
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull();
});
// Init Topic Policies. Send 4 messages in a row, there should be only 2 messages left after compression
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
admin.topicPolicies().setMaxConsumersPerSubscription(topic2, 2);
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 3);
admin.topicPolicies().setMaxConsumersPerSubscription(topic2, 4);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
});
String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
// Trigger compaction and make sure it is finished.
persistentTopic.triggerCompaction();
Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
field.setAccessible(true);
CompletableFuture<Long> future = (CompletableFuture<Long>)field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(future.isDone()));
Consumer consumer = pulsarClient.newConsumer()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
int count = 0;
while (true) {
Message message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
count++;
consumer.acknowledge(message);
} else {
break;
}
}
consumer.close();
assertEquals(count, 2);
// Delete topic, there should be only 1 message left after compression
admin.topics().delete(topic, true);
Awaitility.await().untilAsserted(() ->
assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))));
persistentTopic.triggerCompaction();
field = PersistentTopic.class.getDeclaredField("currentCompaction");
field.setAccessible(true);
CompletableFuture<Long> future2 = (CompletableFuture<Long>)field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(future2.isDone()));
consumer = pulsarClient.newConsumer()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
count = 0;
while (true) {
Message message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
count++;
consumer.acknowledge(message);
} else {
break;
}
}
consumer.close();
assertEquals(count, 1);
}
@Test
public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
int maxConsumersPerSubscription = 10;
admin.topicPolicies().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNotNull());
InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 3, true);
admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
Thread.sleep(4_000L);
pulsar.getBrokerService().checkGC();
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse());
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
}
@Test
public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
pulsar.getBrokerService().getTopics().forEach((k, v) -> {
TopicName topicName = TopicName.get(k);
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
assertNull(NamespaceService.checkHeartbeatNamespaceV2(topicName.getNamespaceObject()));
});
}
@Test(timeOut = 30000)
public void testReplicatorClusterApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
assertNull(admin.topics().getReplicationClusters(topic, false));
List<String> clusters = Lists.newArrayList();
clusters.add("test");
admin.topics().setReplicationClusters(topic, clusters);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getReplicationClusters(topic, false), clusters));
admin.topics().removeReplicationClusters(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getReplicationClusters(topic, false)));
}
@Test
public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
int n = 0;
while (n < 2) {
n++;
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
});
admin.topics().delete(topic);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});
}
}
@Test
public void testGlobalTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() ->
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))
.isNull());
admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1, 2));
SystemTopicBasedTopicPoliciesService topicPoliciesService
= (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
// check global topic policies can be added correctly.
Awaitility.await().untilAsserted(() -> assertNotNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic), true)));
TopicPolicies topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true);
assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic)));
assertEquals(topicPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 1);
assertEquals(topicPolicies.getRetentionPolicies().getRetentionSizeInMB(), 2);
// check global topic policies can be updated correctly.
admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(3, 4));
Awaitility.await().untilAsserted(() -> {
TopicPolicies tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true);
assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic)));
assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 3);
assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 4);
});
//Local topic policies and global topic policies can exist together.
admin.topicPolicies().setRetention(topic, new RetentionPolicies(10, 20));
Awaitility.await().untilAsserted(() -> assertNotNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic))));
TopicPolicies tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true);
assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 3);
assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 4);
tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic));
assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 10);
assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 20);
// check remove global topic policies can be removed correctly.
admin.topicPolicies(true).removeRetention(topic);
Awaitility.await().untilAsserted(() ->
assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic), true).getRetentionPolicies()));
}
@Test
public void testMaxMessageSizeWithChunking() throws Exception {
this.conf.setMaxMessageSize(1000);
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(persistenceTopic)
.enableChunking(true)
.enableBatching(false)
.create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(persistenceTopic,false).join().get();
// send success when topic level maxMessage is not set.
producer.send(new byte[2000]);
admin.topicPolicies().setMaxMessageSize(persistenceTopic, 500);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals((int) topic.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(), 500);
});
// non-chunk message send success
producer.send(new byte[400]);
// chunk message send success
producer.send(new byte[2000]);
}
}