blob: a2401ebe19a0641aaddfdfc815b99566e682cb58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.systopic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class PartitionedSystemTopicTest extends BrokerTestBase {
static final int PARTITIONS = 5;
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(false);
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setDefaultNumPartitions(PARTITIONS);
conf.setManagedLedgerMaxEntriesPerLedger(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setTransactionCoordinatorEnabled(true);
super.baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testAutoCreatedPartitionedSystemTopic() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
SystemTopicClient.Reader reader = systemTopicClientForNamespace.newReader();
int partitions = admin.topics().getPartitionedTopicMetadata(
String.format("persistent://%s/%s", ns, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(ns,
ListTopicsOptions.builder().includeSystemTopic(true).build());
Assert.assertEquals(partitionedTopicList.size(), 1);
Assert.assertEquals(partitions, PARTITIONS);
Assert.assertEquals(admin.topics().getList(ns, null,
ListTopicsOptions.builder().includeSystemTopic(true).build()).size(), PARTITIONS);
reader.close();
}
@Test(timeOut = 1000 * 60)
public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception {
String tenant = "tenant-" + RandomStringUtils.randomAlphabetic(4).toLowerCase();
admin.tenants().createTenant(tenant, new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
int namespaceCount = 30;
for (int i = 0; i < namespaceCount; i++) {
String ns = tenant + "/ns-" + i;
admin.namespaces().createNamespace(ns, 4);
String topic = ns + "/t1";
admin.topics().createPartitionedTopic(topic, 2);
}
List<CompletableFuture<Consumer<byte[]>>> futureList = new ArrayList<>();
for (int i = 0; i < namespaceCount; i++) {
String topic = tenant + "/ns-" + i + "/t1";
futureList.add(pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribeAsync());
}
FutureUtil.waitForAll(futureList).get();
// Close all the consumers after check
for (CompletableFuture<Consumer<byte[]>> consumer : futureList) {
consumer.join().close();
}
}
@Test
public void testProduceAndConsumeUnderSystemNamespace() throws Exception {
TenantInfo tenantInfo = TenantInfo
.builder()
.adminRoles(Sets.newHashSet("admin"))
.allowedClusters(Sets.newHashSet("test"))
.build();
admin.tenants().createTenant("pulsar", tenantInfo);
admin.namespaces().createNamespace("pulsar/system", 2);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic("pulsar/system/__topic-1").create();
producer.send("test".getBytes(StandardCharsets.UTF_8));
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic("pulsar/system/__topic-1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Message<byte[]> receive = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(receive);
}
@Test
public void testHealthCheckTopicNotOffload() throws Exception {
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName.toString(), true).get().get();
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
config.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
admin.brokers().healthcheck(TopicVersion.V2);
admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
});
LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);
Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
}
@Test
public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
Optional<Topic> optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());
TopicName heartbeatTopicName = TopicName.get("persistent",
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
admin.topics().getRetention(heartbeatTopicName.toString());
optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());
}
@Test
public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
for (int partition = 0; partition < PARTITIONS; partition ++) {
pulsar.getBrokerService()
.getTopic(topicName.getPartition(partition).toString(), true).join();
}
Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {
admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
});
}
@Test
public void testHeartbeatTopicBeDeleted() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 1);
Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
admin.topics().delete(heartbeatTopicName.toString(), true);
topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 0);
}
@Test
public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent",
namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
Optional<Topic> optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());
List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 1);
TopicName heartbeatTopicName = TopicName.get("persistent",
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
}
@Test
public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
BacklogQuota quota = BacklogQuota.builder()
.limitTime(2)
.limitSize(-1)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(ns, quota, BacklogQuota.BacklogQuotaType.message_age);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
String partition0 = TopicName.get(String.format("persistent://%s", topic)).getPartition(0).toString();
Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(partition0);
Assert.assertTrue(topicReference.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
persistentTopic.getManagedLedger().setConfig(config);
MethodUtils.invokeMethod(
persistentTopic.getManagedLedger(),
true,
"updateLastLedgerCreatedTimeAndScheduleRolloverTask");
String msg1 = "msg-1";
producer.send(msg1);
Thread.sleep(3 * 1000);
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub-1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
Message<String> receive = consumer2.receive();
consumer2.acknowledge(receive);
Thread.sleep(3 * 1000);
try {
@Cleanup
PulsarClient pulsarClient1 = PulsarClient.builder()
.maxBackoffInterval(3, TimeUnit.SECONDS)
.operationTimeout(5, TimeUnit.SECONDS)
.serviceUrl(lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build();
Producer<String> producerN = pulsarClient1
.newProducer(Schema.STRING).topic(topic).sendTimeout(3, TimeUnit.SECONDS).create();
Assert.assertTrue(producerN.isConnected());
producerN.close();
} catch (Exception ex) {
Assert.fail("failed to create producer");
}
}
@Test
public void testSystemTopicNotCheckExceed() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
conf.setMaxSameAddressProducersPerTopic(1);
admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
Assert.assertTrue(reader1.hasMoreEvents());
Assert.assertNotNull(reader1.readNext());
Assert.assertTrue(reader2.hasMoreEvents());
Assert.assertNotNull(reader2.readNext());
reader1.close();
reader2.close();
writer1.get().close();
writer2.get().close();
}
@Test
public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
admin.topicPolicies().setMaxConsumers(topicName, 2);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
CompletableFuture<Optional<Topic>> topic = pulsar.getBrokerService().getTopic(topicName, false);
PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
persistentTopic.close();
admin.topics().delete(topicName);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
assertNull(topicPolicies);
String base = TopicName.get(topicName).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = pulsar.getSchemaRegistryService().getSchema(id);
assertNull(schema.join());
}
}