blob: 6522a7ae9bebc8fc3590f0b584a0b2e69e900041 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.eclipse.jetty.http.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
private final String testTenant = "my-tenant";
private final String testNamespace = "my-namespace";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/test-topic-message-ttl";
@BeforeMethod
@Override
protected void setup() throws Exception {
resetConfig();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setTtlDurationDefaultInSeconds(3600);
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
admin.topics().createPartitionedTopic(testTopic, 2);
Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "dummy-topic").create();
producer.close();
waitForZooKeeperWatchers();
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testSetThenRemoveMessageTTL() throws Exception {
admin.topics().setMessageTTL(testTopic, 100);
log.info("Message TTL set success on topic: {}", testTopic);
waitForZooKeeperWatchers();
Integer messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 100);
waitForZooKeeperWatchers();
admin.topics().removeMessageTTL(testTopic);
messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertNull(messageTTL);
}
@Test
public void testSetInvalidMessageTTL() throws Exception {
try {
admin.topics().setMessageTTL(testTopic, -100);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.PRECONDITION_FAILED_412);
}
try {
admin.topics().setMessageTTL(testTopic, (int)2147483650L);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.PRECONDITION_FAILED_412);
}
}
@Test
public void testGetMessageTTL() throws Exception {
// Check default topic level message TTL.
Integer messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertNull(messageTTL);
admin.topics().setMessageTTL(testTopic, 200);
log.info("Message TTL set success on topic: {}", testTopic);
waitForZooKeeperWatchers();
messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 200);
}
@Test
public void testTopicPolicyDisabled() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(false);
super.internalCleanup();
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
admin.topics().createPartitionedTopic(testTopic, 2);
try {
admin.topics().getMessageTTL(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
}
try {
admin.topics().setMessageTTL(testTopic, 200);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
}
}
@Test(timeOut = 20000)
public void testDifferentLevelPolicyPriority() throws Exception {
final String topicName = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Method method = PersistentTopic.class.getDeclaredMethod("getMessageTTL");
method.setAccessible(true);
Integer namespaceMessageTTL = admin.namespaces().getNamespaceMessageTTL(myNamespace);
Assert.assertNull(namespaceMessageTTL);
Assert.assertEquals(method.invoke(persistentTopic), 3600);
admin.namespaces().setNamespaceMessageTTL(myNamespace, 10);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 10));
Assert.assertEquals((int)method.invoke(persistentTopic), 10);
admin.namespaces().setNamespaceMessageTTL(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 0));
Assert.assertEquals((int)method.invoke(persistentTopic), 0);
admin.namespaces().removeNamespaceMessageTTL(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertNull(admin.namespaces().getNamespaceMessageTTL(myNamespace)));
Assert.assertEquals((int)method.invoke(persistentTopic), 3600);
}
@Test(timeOut = 20000)
public void testDifferentLevelPolicyApplied() throws Exception {
final String topicName = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Method method = PersistentTopic.class.getDeclaredMethod("getMessageTTL");
method.setAccessible(true);
//namespace-level default value is null
Integer namespaceMessageTTL = admin.namespaces().getNamespaceMessageTTL(myNamespace);
Assert.assertNull(namespaceMessageTTL);
//topic-level default value is null
Integer topicMessageTTL = admin.topics().getMessageTTL(topicName);
Assert.assertNull(topicMessageTTL);
//use broker-level by default
int topicMessageTTLApplied = admin.topics().getMessageTTL(topicName, true);
Assert.assertEquals(topicMessageTTLApplied, 3600);
admin.namespaces().setNamespaceMessageTTL(myNamespace, 10);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 10));
topicMessageTTLApplied = admin.topics().getMessageTTL(topicName, true);
Assert.assertEquals(topicMessageTTLApplied, 10);
admin.namespaces().setNamespaceMessageTTL(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 0));
topicMessageTTLApplied = admin.topics().getMessageTTL(topicName, true);
Assert.assertEquals(topicMessageTTLApplied, 0);
admin.topics().setMessageTTL(topicName, 20);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertNotNull(admin.topics().getMessageTTL(topicName)));
topicMessageTTLApplied = admin.topics().getMessageTTL(topicName, true);
Assert.assertEquals(topicMessageTTLApplied, 20);
admin.namespaces().removeNamespaceMessageTTL(myNamespace);
admin.topics().removeMessageTTL(topicName);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertEquals(admin.topics().getMessageTTL(topicName, true).intValue(), 3600));
Assert.assertEquals((int)method.invoke(persistentTopic), 3600);
}
}