blob: 7b906733ca5cc8dea17e7fe929aefdc777551d77 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.net.URL;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class BacklogQuotaManagerTest {
PulsarService pulsar;
ServiceConfiguration config;
URL adminUrl;
PulsarAdmin admin;
LocalBookkeeperEnsemble bkEnsemble;
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
private static final int MAX_ENTRIES_PER_LEDGER = 5;
@BeforeMethod
void setup() throws Exception {
try {
// start local bookie and zookeeper
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
// start pulsar service
config = new ServiceConfiguration();
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.ofNullable(0));
config.setClusterName("usc");
config.setBrokerServicePort(Optional.ofNullable(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
pulsar.start();
adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get());
admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build();
admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
admin.tenants().createTenant("prop",
new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("usc")));
admin.namespaces().createNamespace("prop/ns-quota");
admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc"));
admin.namespaces().createNamespace("prop/quotahold");
admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc"));
admin.namespaces().createNamespace("prop/quotaholdasync");
admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc"));
} catch (Throwable t) {
LOG.error("Error setting up broker test", t);
fail("Broker test setup failed");
}
}
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
try {
admin.close();
pulsar.close();
bkEnsemble.stop();
} catch (Throwable t) {
LOG.error("Error cleaning up broker test setup state", t);
fail("Broker test cleanup failed");
}
}
private void rolloverStats() {
pulsar.getBrokerService().updateRates();
}
/**
* Readers should not effect backlog quota
*/
@Test
public void testBacklogQuotaWithReader() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1";
final int numMsgs = 20;
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
MessageId msgId = producer.send(content);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
// overall backlogSize should be zero because we only have readers
assertEquals(stats.backlogSize, 0, "backlog size is [" + stats.backlogSize + "]");
// non-durable mes should still
assertEquals(stats.subscriptions.size(), 1);
long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
MessageId msgId = producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
}
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic1, false);
// check there is only one ledger left
// TODO in theory there shouldn't be any ledgers left if we are using readers.
// However, trimming of ledgers are piggy packed onto ledger operations.
// So if there isn't new data coming in, trimming never occurs.
// We need to trigger trimming on a schedule to actually delete all remaining ledgers
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
// check reader can still read with out error
while (true) {
Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
if (msg == null) {
break;
}
LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]);
}
}
}
@Test
public void testTriggerBacklogQuotaWithReader() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
final int numMsgs = 20;
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
producer.send(content);
}
admin.brokers().backlogQuotaCheck();
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
// overall backlogSize should be zero because we only have readers
assertEquals(stats.backlogSize, 0, "backlog size is [" + stats.backlogSize + "]");
// non-durable mes should still
assertEquals(stats.subscriptions.size(), 1);
long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
content[0] = (byte) (content[0] + 1);
producer.send(content);
}
} catch (PulsarClientException ce) {
fail("Should not have gotten exception: " + ce.getMessage());
}
// make sure ledgers are trimmed
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false);
// check there is only one ledger left
assertEquals(internalStats.ledgers.size(), 1);
// check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1);
// check reader can still read with out error
while (true) {
Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
if (msg == null) {
break;
}
LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]);
}
producer.close();
reader.close();
}
}
@Test
public void testConsumerBacklogEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
final String topic1 = "persistent://prop/ns-quota/topic1";
final String subName1 = "c1";
final String subName2 = "c2";
final int numMsgs = 20;
Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topic1).create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
assertTrue(stats.backlogSize < 10 * 1024, "Storage size is [" + stats.storageSize + "]");
client.close();
}
@Test
public void testConsumerBacklogEvictionWithAck() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
final String topic1 = "persistent://prop/ns-quota/topic11";
final String subName1 = "c11";
final String subName2 = "c21";
final int numMsgs = 20;
Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topic1).create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
// only one consumer acknowledges the message
consumer1.acknowledge(consumer1.receive());
consumer2.receive();
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
assertTrue(stats.backlogSize <= 10 * 1024, "Storage size is [" + stats.storageSize + "]");
client.close();
}
@Test
public void testConcurrentAckAndEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
final String topic1 = "persistent://prop/ns-quota/topic12";
final String subName1 = "c12";
final String subName2 = "c22";
final int numMsgs = 20;
final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer1 = client2.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
Thread producerThread = new Thread() {
public void run() {
try {
barrier.await();
org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topic1)
.create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}
producer.close();
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
Thread ConsumerThread = new Thread() {
public void run() {
try {
barrier.await();
for (int i = 0; i < numMsgs; i++) {
// only one consumer acknowledges the message
consumer1.acknowledge(consumer1.receive());
consumer2.receive();
}
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
producerThread.start();
ConsumerThread.start();
// test hangs without timeout since there is nothing to consume due to eviction
counter.await(20, TimeUnit.SECONDS);
assertFalse(gotException.get());
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
assertTrue(stats.backlogSize <= 10 * 1024, "Storage size is [" + stats.storageSize + "]");
client.close();
client2.close();
}
@Test
public void testNoEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
final String topic1 = "persistent://prop/ns-quota/topic13";
final String subName1 = "c13";
final String subName2 = "c23";
final int numMsgs = 10;
final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
Thread producerThread = new Thread() {
public void run() {
try {
barrier.await();
org.apache.pulsar.client.api.Producer<byte[]> producer = client2.newProducer().topic(topic1)
.create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}
producer.close();
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
Thread ConsumerThread = new Thread() {
public void run() {
try {
barrier.await();
for (int i = 0; i < numMsgs; i++) {
consumer1.acknowledge(consumer1.receive());
consumer2.acknowledge(consumer2.receive());
}
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
producerThread.start();
ConsumerThread.start();
counter.await();
assertFalse(gotException.get());
client.close();
client2.close();
}
@Test
public void testEvictionMulti() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(15 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
final String topic1 = "persistent://prop/ns-quota/topic14";
final String subName1 = "c14";
final String subName2 = "c24";
final int numMsgs = 10;
final CyclicBarrier barrier = new CyclicBarrier(4);
final CountDownLatch counter = new CountDownLatch(4);
final AtomicBoolean gotException = new AtomicBoolean(false);
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
final PulsarClient client3 = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
Thread producerThread1 = new Thread() {
public void run() {
try {
barrier.await();
org.apache.pulsar.client.api.Producer<byte[]> producer = client2.newProducer().topic(topic1)
.create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}
producer.close();
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
Thread producerThread2 = new Thread() {
public void run() {
try {
barrier.await();
org.apache.pulsar.client.api.Producer<byte[]> producer = client3.newProducer().topic(topic1)
.create();
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}
producer.close();
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
Thread ConsumerThread1 = new Thread() {
public void run() {
try {
barrier.await();
for (int i = 0; i < numMsgs * 2; i++) {
consumer1.acknowledge(consumer1.receive());
}
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
Thread ConsumerThread2 = new Thread() {
public void run() {
try {
barrier.await();
for (int i = 0; i < numMsgs * 2; i++) {
consumer2.acknowledge(consumer2.receive());
}
} catch (Exception e) {
gotException.set(true);
} finally {
counter.countDown();
}
}
};
producerThread1.start();
producerThread2.start();
ConsumerThread1.start();
ConsumerThread2.start();
counter.await(20, TimeUnit.SECONDS);
assertFalse(gotException.get());
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
assertTrue(stats.backlogSize <= 15 * 1024, "Storage size is [" + stats.storageSize + "]");
client.close();
client2.close();
client3.close();
}
@Test
public void testAheadProducerOnHold() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final String topic1 = "persistent://prop/quotahold/hold";
final String subName1 = "c1hold";
final int numMsgs = 10;
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
byte[] content = new byte[1024];
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
for (int i = 0; i <= numMsgs; i++) {
try {
producer.send(content);
LOG.info("sent [{}]", i);
} catch (PulsarClientException.TimeoutException cte) {
// producer close may cause a timeout on send
LOG.info("timeout on [{}]", i);
}
}
for (int i = 0; i < numMsgs; i++) {
consumer.receive();
LOG.info("received [{}]", i);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
assertEquals(stats.publishers.size(), 0,
"Number of producers on topic " + topic1 + " are [" + stats.publishers.size() + "]");
client.close();
}
@Test
public void testAheadProducerOnHoldTimeout() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final String topic1 = "persistent://prop/quotahold/holdtimeout";
final String subName1 = "c1holdtimeout";
boolean gotException = false;
client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
byte[] content = new byte[1024];
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
for (int i = 0; i < 10; i++) {
producer.send(content);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
try {
// try to send over backlog quota and make sure it fails
producer.send(content);
producer.send(content);
fail("backlog quota did not exceed");
} catch (PulsarClientException.TimeoutException te) {
gotException = true;
}
assertTrue(gotException, "timeout did not occur");
client.close();
}
@Test
public void testProducerException() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final String topic1 = "persistent://prop/quotahold/except";
final String subName1 = "c1except";
boolean gotException = false;
client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
byte[] content = new byte[1024];
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
for (int i = 0; i < 10; i++) {
producer.send(content);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
try {
// try to send over backlog quota and make sure it fails
producer.send(content);
producer.send(content);
fail("backlog quota did not exceed");
} catch (PulsarClientException ce) {
assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException
|| ce instanceof PulsarClientException.TimeoutException, ce.getMessage());
gotException = true;
}
assertTrue(gotException, "backlog exceeded exception did not occur");
client.close();
}
@Test
public void testProducerExceptionAndThenUnblock() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final String topic1 = "persistent://prop/quotahold/exceptandunblock";
final String subName1 = "c1except";
boolean gotException = false;
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
byte[] content = new byte[1024];
Producer<byte[]> producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create();
for (int i = 0; i < 10; i++) {
producer.send(content);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
try {
// try to send over backlog quota and make sure it fails
producer.send(content);
producer.send(content);
fail("backlog quota did not exceed");
} catch (PulsarClientException ce) {
assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException
|| ce instanceof PulsarClientException.TimeoutException, ce.getMessage());
gotException = true;
}
assertTrue(gotException, "backlog exceeded exception did not occur");
// now remove backlog and ensure that producer is unblockedrolloverStats();
TopicStats stats = admin.topics().getStats(topic1);
int backlog = (int) stats.subscriptions.get(subName1).msgBacklog;
for (int i = 0; i < backlog; i++) {
Message<?> msg = consumer.receive();
consumer.acknowledge(msg);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
// publish should work now
Exception sendException = null;
gotException = false;
try {
for (int i = 0; i < 5; i++) {
producer.send(content);
}
} catch (Exception e) {
gotException = true;
sendException = e;
}
assertFalse(gotException, "unable to publish due to " + sendException);
client.close();
}
private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
}