blob: f9e96758c66ac1e94f8be6ec58a6f8cd4cb23fe3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.*;
@Slf4j
@Test(groups = "broker")
public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("max-unacked-messages", tenantInfo);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}
@Test(timeOut = 30000)
public void testNamespacePolicy() throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(3);
admin.namespaces().createNamespace("max-unacked-messages/policy-on-consumers");
final String namespace = "max-unacked-messages/policy-on-consumers";
final String topic = "persistent://" + namespace + "/testNamespacePolicy";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().receiverQueueSize(1)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub").topic(topic).subscribe();
//set namespace-level policy
admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 1);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Awaitility.await().untilAsserted(() ->
assertEquals(persistentTopic.getSubscription("sub")
.getConsumers().get(0).getMaxUnackedMessages(), 1));
//consumer-throttling should take effect
for (int i = 0; i < 20; i++) {
producer.send("msg".getBytes());
}
Message<byte[]> message = consumer.receive(500, TimeUnit.MILLISECONDS);
assertNotNull(message);
Message<byte[]> nullMsg = consumer.receive(500, TimeUnit.MILLISECONDS);
assertNull(nullMsg);
//disable limit check
admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 0);
Awaitility.await().untilAsserted(() ->
assertEquals(persistentTopic.getSubscription("sub")
.getConsumers().get(0).getMaxUnackedMessages(), 0));
consumer.acknowledge(message);
message = consumer.receive(500, TimeUnit.MILLISECONDS);
assertNotNull(message);
}
@Test
public void testMaxUnackedMessagesOnConsumers() throws Exception {
admin.namespaces().createNamespace("max-unacked-messages/default-on-consumers");
String namespace = "max-unacked-messages/default-on-consumers";
assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 2*50000);
assertEquals(2*50000, admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue());
}
@Test
public void testMaxUnackedMessagesOnSubscription() throws Exception {
admin.namespaces().createNamespace("max-unacked-messages/default-on-subscription");
String namespace = "max-unacked-messages/default-on-subscription";
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace));
admin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 2*200000);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(2*200000, admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace).intValue()));
admin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace)));
}
@Test
public void testMaxUnackedMessagesPerConsumerPriority() throws Exception {
int brokerLevelPolicy = 3;
int namespaceLevelPolicy = 2;
int topicLevelPolicy = 1;
cleanup();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setMaxUnackedMessagesPerConsumer(brokerLevelPolicy);
setup();
final String namespace = "max-unacked-messages/priority-on-consumers";
final String topic = "persistent://" + namespace + "/testMaxUnackedMessagesPerConsumerPriority";
admin.namespaces().createNamespace(namespace);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 50; i++) {
producer.send("msg".getBytes());
}
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic));
admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, namespaceLevelPolicy);
admin.topics().setMaxUnackedMessagesOnConsumer(topic, topicLevelPolicy);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)));
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), namespaceLevelPolicy);
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), topicLevelPolicy);
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("sub").topic(topic).receiverQueueSize(1).subscribe();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
org.apache.pulsar.broker.service.Consumer serverConsumer =
persistentTopic.getSubscription("sub").getConsumers().get(0);
assertEquals(serverConsumer.getMaxUnackedMessages(), topicLevelPolicy);
List<Message> msgs = consumeMsg(consumer, 3);
assertEquals(msgs.size(), 1);
//disable topic-level limiter
admin.topics().setMaxUnackedMessagesOnConsumer(topic, 0);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 0));
ackMsgs(consumer, msgs);
//remove topic-level policy, namespace-level should take effect
admin.topics().removeMaxUnackedMessagesOnConsumer(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), namespaceLevelPolicy);
Awaitility.await().untilAsserted(() ->
assertEquals(serverConsumer.getMaxUnackedMessages(), namespaceLevelPolicy));
msgs = consumeMsg(consumer, 5);
assertEquals(msgs.size(), namespaceLevelPolicy);
ackMsgs(consumer, msgs);
//disable namespace-level limiter
admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 0);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), 0));
msgs = consumeMsg(consumer, 5);
assertEquals(msgs.size(), 5);
ackMsgs(consumer, msgs);
//remove namespace-level policy, broker-level should take effect
admin.namespaces().removeMaxUnackedMessagesPerConsumer(namespace);
Awaitility.await().untilAsserted(()
-> assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)));
msgs = consumeMsg(consumer, 5);
assertEquals(msgs.size(), brokerLevelPolicy);
ackMsgs(consumer, msgs);
}
private List<Message> consumeMsg(Consumer<?> consumer, int msgNum) throws Exception {
List<Message> list = new ArrayList<>();
for (int i = 0; i <msgNum; i++) {
Message message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
list.add(message);
}
return list;
}
private void ackMsgs(Consumer<?> consumer, List<Message> msgs) throws Exception {
for (Message msg : msgs) {
consumer.acknowledge(msg);
}
}
}