blob: fab7a5e1e03e550dca53054f139cf92ab357e78d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.Data;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class NonPersistentTopicE2ETest extends BrokerTestBase {
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class);
@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Data
private static class Foo {
private String field1;
private String field2;
private int field3;
}
private Optional<Topic> getTopic(String topicName) {
return pulsar.getBrokerService().getTopicReference(topicName);
}
private boolean topicHasSchema(String topicName) {
String base = TopicName.get(topicName).getPartitionedTopicName();
String schemaName = TopicName.get(base).getSchemaName();
SchemaRegistry.SchemaAndMetadata result = pulsar.getSchemaRegistryService().getSchema(schemaName).join();
return result != null && !result.schema.isDeleted();
}
@Test
public void testGCWillDeleteSchema() throws Exception {
// 1. Simple successful GC
String topicName = "non-persistent://prop/ns-abc/topic-1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
Optional<Topic> topic = getTopic(topicName);
assertTrue(topic.isPresent());
byte[] data = JSONSchema.of(SchemaDefinition.builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema();
SchemaData schemaData = SchemaData.builder()
.data(data)
.type(SchemaType.BYTES)
.user("foo").build();
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
// 1a. Topic that add/removes subscription can be GC'd
topicName = "non-persistent://prop/ns-abc/topic-1a";
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
admin.topics().deleteSubscription(topicName, subName);
consumer.close();
runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
// 2. Topic is not GCed with live connection
topicName = "non-persistent://prop/ns-abc/topic-2";
subName = "sub1";
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
}
@Test
public void testPatternTopic() throws PulsarClientException, InterruptedException {
final String topic1 = "non-persistent://prop/ns-abc/testPatternTopic1-" + UUID.randomUUID().toString();
final String topic2 = "non-persistent://prop/ns-abc/testPatternTopic2-" + UUID.randomUUID().toString();
Pattern pattern = Pattern.compile("prop/ns-abc/test.*");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.subscriptionName("my-sub")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic1)
.create();
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic2)
.create();
Thread.sleep(2000);
final int messages = 10;
for (int i = 0; i < messages; i++) {
producer1.send("Message sent by producer-1 -> " + i);
producer2.send("Message sent by producer-2 -> " + i);
}
for (int i = 0; i < messages * 2; i++) {
Message<String> received = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNotNull(received);
}
consumer.close();
producer1.close();
producer2.close();
}
@Test
public void testGC() throws Exception {
// 1. Simple successful GC
String topicName = "non-persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 5. Get the topic and make sure it doesn't come back
admin.lookups().lookupTopic(topicName);
Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
assertFalse(topic.isPresent());
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// write again, the topic will be available
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
producer2.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
}