| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| import com.beust.jcommander.internal.Maps; |
| import com.google.common.collect.Sets; |
| import java.net.URL; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import lombok.Cleanup; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Reader; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; |
| import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import org.awaitility.Awaitility; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "broker") |
| public class BacklogQuotaManagerTest { |
| PulsarService pulsar; |
| ServiceConfiguration config; |
| |
| URL adminUrl; |
| PulsarAdmin admin; |
| |
| LocalBookkeeperEnsemble bkEnsemble; |
| |
| private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3; |
| private static final int MAX_ENTRIES_PER_LEDGER = 5; |
| |
| @DataProvider(name = "backlogQuotaSizeGB") |
| public Object[][] backlogQuotaSizeGB() { |
| return new Object[][] { { true }, { false } }; |
| } |
| |
| @BeforeMethod |
| void setup() throws Exception { |
| try { |
| // start local bookie and zookeeper |
| bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble.start(); |
| |
| // start pulsar service |
| config = new ServiceConfiguration(); |
| config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); |
| config.setAdvertisedAddress("localhost"); |
| config.setWebServicePort(Optional.of(0)); |
| config.setClusterName("usc"); |
| config.setBrokerShutdownTimeoutMs(0L); |
| config.setBrokerServicePort(Optional.of(0)); |
| config.setAuthorizationEnabled(false); |
| config.setAuthenticationEnabled(false); |
| config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); |
| config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); |
| config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); |
| config.setAllowAutoTopicCreationType("non-partitioned"); |
| |
| pulsar = new PulsarService(config); |
| pulsar.start(); |
| |
| adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get()); |
| admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build(); |
| |
| admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl(adminUrl.toString()).build()); |
| admin.tenants().createTenant("prop", |
| new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("usc"))); |
| admin.namespaces().createNamespace("prop/ns-quota"); |
| admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc")); |
| admin.namespaces().createNamespace("prop/quotahold"); |
| admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc")); |
| admin.namespaces().createNamespace("prop/quotaholdasync"); |
| admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc")); |
| } catch (Throwable t) { |
| LOG.error("Error setting up broker test", t); |
| fail("Broker test setup failed"); |
| } |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| void shutdown() throws Exception { |
| try { |
| if (admin != null) { |
| admin.close(); |
| admin = null; |
| } |
| if (pulsar != null) { |
| pulsar.close(); |
| pulsar = null; |
| } |
| if (bkEnsemble != null) { |
| bkEnsemble.stop(); |
| bkEnsemble = null; |
| } |
| } catch (Throwable t) { |
| LOG.error("Error cleaning up broker test setup state", t); |
| fail("Broker test cleanup failed"); |
| } |
| } |
| |
| private void rolloverStats() { |
| pulsar.getBrokerService().updateRates(); |
| } |
| |
| /** |
| * Readers should not effect backlog quota |
| */ |
| @Test |
| public void testBacklogQuotaWithReader() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build()); |
| try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { |
| final String topic1 = "persistent://prop/ns-quota/topic1"; |
| final int numMsgs = 20; |
| |
| Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); |
| |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| MessageId msgId = producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| |
| // overall backlogSize should be zero because we only have readers |
| assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]"); |
| |
| // non-durable mes should still |
| assertEquals(stats.getSubscriptions().size(), 1); |
| long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); |
| assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, |
| "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| MessageId msgId = producer.send(content); |
| } |
| } catch (PulsarClientException ce) { |
| fail("Should not have gotten exception: " + ce.getMessage()); |
| } |
| |
| // TODO in theory there shouldn't be any ledgers left if we are using readers. |
| // However, trimming of ledgers are piggy packed onto ledger operations. |
| // So if there isn't new data coming in, trimming never occurs. |
| // We need to trigger trimming on a schedule to actually delete all remaining ledgers |
| Awaitility.await().untilAsserted(() -> { |
| // make sure ledgers are trimmed |
| PersistentTopicInternalStats internalStats = |
| admin.topics().getInternalStats(topic1, false); |
| |
| // check there is only one ledger left |
| assertEquals(internalStats.ledgers.size(), 1); |
| |
| // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER |
| assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); |
| }); |
| |
| // check reader can still read with out error |
| |
| while (true) { |
| Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS); |
| if (msg == null) { |
| break; |
| } |
| LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]); |
| } |
| } |
| } |
| |
| @Test |
| public void testTriggerBacklogQuotaSizeWithReader() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build()); |
| try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { |
| final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID(); |
| final int numMsgs = 20; |
| Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); |
| Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| producer.send(content); |
| } |
| Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000); |
| admin.brokers().backlogQuotaCheck(); |
| rolloverStats(); |
| TopicStats stats = admin.topics().getStats(topic1); |
| // overall backlogSize should be zero because we only have readers |
| assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]"); |
| // non-durable mes should still |
| assertEquals(stats.getSubscriptions().size(), 1); |
| long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); |
| assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER, |
| "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ; |
| try { |
| // try to send over backlog quota and make sure it fails |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| producer.send(content); |
| } |
| } catch (PulsarClientException ce) { |
| fail("Should not have gotten exception: " + ce.getMessage()); |
| } |
| |
| Awaitility.await().untilAsserted(() -> { |
| // make sure ledgers are trimmed |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); |
| |
| // check there is only one ledger left |
| assertEquals(internalStats.ledgers.size(), 1); |
| |
| // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER |
| assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); |
| }); |
| // check reader can still read with out error |
| |
| while (true) { |
| Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS); |
| if (msg == null) { |
| break; |
| } |
| LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]); |
| } |
| producer.close(); |
| reader.close(); |
| } |
| } |
| |
| /** |
| * Time based backlog quota won't affect reader since broker doesn't keep track of consuming position for reader |
| * and can't do message age check against the quota. |
| * @throws Exception |
| */ |
| @Test |
| public void testTriggerBacklogTimeQuotaWithReader() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build()); |
| try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { |
| final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); |
| final int numMsgs = 9; |
| Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); |
| Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| producer.send(content); |
| } |
| Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000); |
| admin.brokers().backlogQuotaCheck(); |
| rolloverStats(); |
| TopicStats stats = admin.topics().getStats(topic1); |
| // overall backlogSize should be zero because we only have readers |
| assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]"); |
| // non-durable mes should still |
| assertEquals(stats.getSubscriptions().size(), 1); |
| long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog(); |
| // non-durable subscription won't trigger the check for time based backlog quota |
| // and cause back pressure action to be token. Since broker don't keep track consuming position for reader. |
| assertEquals(nonDurableSubscriptionBacklog, numMsgs, |
| "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); |
| |
| Awaitility.await() |
| .pollDelay(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA)) |
| .pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { |
| // make sure ledgers are trimmed |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); |
| |
| // check that there are 2 ledgers |
| assertEquals(internalStats.ledgers.size(), 2); |
| }); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| for (int i = 0; i < numMsgs; i++) { |
| content[0] = (byte) (content[0] + 1); |
| producer.send(content); |
| } |
| } catch (PulsarClientException ce) { |
| fail("Should not have gotten exception: " + ce.getMessage()); |
| } |
| |
| // check reader can still read without error |
| while (true) { |
| Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS); |
| if (msg == null) { |
| break; |
| } |
| LOG.info("msg read: {} - {}", msg.getMessageId(), msg.getData()[0]); |
| } |
| producer.close(); |
| reader.close(); |
| } |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionSizeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build()); |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic2"; |
| final String subName1 = "c1"; |
| final String subName2 = "c2"; |
| final int numMsgs = 20; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| consumer1.receive(); |
| consumer2.receive(); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| config.setPreciseTimeBasedBacklogQuotaCheck(true); |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic3"; |
| final String subName1 = "c1"; |
| final String subName2 = "c2"; |
| final int numMsgs = 9; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| consumer1.receive(); |
| consumer2.receive(); |
| } |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 9); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9); |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| rolloverStats(); |
| |
| stats = admin.topics().getStats(topic1); |
| // All messages for both subscription should be cleaned up from backlog by backlog monitor task. |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0); |
| client.close(); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionTimeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic3"; |
| final String subName1 = "c1"; |
| final String subName2 = "c2"; |
| final int numMsgs = 14; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| consumer1.receive(); |
| consumer2.receive(); |
| } |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| rolloverStats(); |
| |
| stats = admin.topics().getStats(topic1); |
| // Messages on first 2 ledgers should be expired, backlog is number of |
| // message in current ledger which should be 4. |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 4); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 4); |
| client.close(); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| final String topic = "persistent://prop/ns-quota/topic4"; |
| final String subName = "c1"; |
| |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subName).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic); |
| producer.send(new byte[1024]); |
| consumer.receive(); |
| |
| admin.topics().unload(topic); |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic); |
| assertEquals(internalStats.ledgers.size(), 2); |
| assertEquals(internalStats.ledgers.get(1).entries, 0); |
| |
| TopicStats stats = admin.topics().getStats(topic); |
| assertEquals(stats.getSubscriptions().get(subName).getMsgBacklog(), 1); |
| |
| TimeUnit.SECONDS.sleep(TIME_TO_CHECK_BACKLOG_QUOTA); |
| |
| Awaitility.await() |
| .pollInterval(Duration.ofSeconds(1)) |
| .atMost(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA)) |
| .untilAsserted(() -> { |
| rolloverStats(); |
| |
| // Cause the last ledger is empty, it is not possible to skip first ledger, |
| // so the number of ledgers will keep unchanged, and backlog is clear |
| PersistentTopicInternalStats latestInternalStats = admin.topics().getInternalStats(topic); |
| assertEquals(latestInternalStats.ledgers.size(), 2); |
| assertEquals(latestInternalStats.ledgers.get(1).entries, 0); |
| TopicStats latestStats = admin.topics().getStats(topic); |
| assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0); |
| }); |
| |
| client.close(); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build()); |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic11"; |
| final String subName1 = "c11"; |
| final String subName2 = "c21"; |
| final int numMsgs = 20; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| // only one consumer acknowledges the message |
| consumer1.acknowledge(consumer1.receive()); |
| consumer2.receive(); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertTrue(stats.getBacklogSize() <= 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| config.setPreciseTimeBasedBacklogQuotaCheck(true); |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic12"; |
| final String subName1 = "c11"; |
| final String subName2 = "c21"; |
| final int numMsgs = 9; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| consumer1.receive(); |
| consumer2.receive(); |
| } |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 9); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9); |
| |
| consumer1.redeliverUnacknowledgedMessages(); |
| for (int i = 0; i < numMsgs; i++) { |
| // only one consumer acknowledges the message |
| consumer1.acknowledge(consumer1.receive()); |
| } |
| |
| Thread.sleep(1000); |
| rolloverStats(); |
| stats = admin.topics().getStats(topic1); |
| // sub1 has empty backlog as it acked all messages |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 9); |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| rolloverStats(); |
| |
| stats = admin.topics().getStats(topic1); |
| // sub2 has empty backlog because it's backlog get cleaned up by backlog quota monitor task |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0); |
| client.close(); |
| } |
| |
| private Producer<byte[]> createProducer(PulsarClient client, String topic) |
| throws PulsarClientException { |
| return client.newProducer() |
| .enableBatching(false) |
| .sendTimeout(2, TimeUnit.SECONDS) |
| .topic(topic) |
| .create(); |
| } |
| |
| @Test |
| public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic12"; |
| final String subName1 = "c11"; |
| final String subName2 = "c21"; |
| final int numMsgs = 14; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| |
| List<Message<byte[]>> messagesToAcknowledge = new ArrayList<>(); |
| |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| messagesToAcknowledge.add(consumer1.receive()); |
| consumer2.receive(); |
| } |
| |
| { |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); |
| } |
| |
| for (int i = 0; i < numMsgs; i++) { |
| // pause before acknowledging the 11. message so that 2 first ledgers (5 msgs/ledger) will expire before the |
| // last ledger |
| if (i == 10) { |
| Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000L); |
| } |
| // only one consumer acknowledges the message |
| consumer1.acknowledge(messagesToAcknowledge.get(i)); |
| } |
| |
| Awaitility.await() |
| .pollInterval(Duration.ofSeconds(1)) |
| .untilAsserted(() -> { |
| rolloverStats(); |
| TopicStats stats = admin.topics().getStats(topic1); |
| // sub1 has empty backlog as it acked all messages |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); |
| assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); |
| }); |
| |
| Awaitility.await() |
| .pollInterval(Duration.ofSeconds(1)) |
| .atMost(Duration.ofSeconds(4 * TIME_TO_CHECK_BACKLOG_QUOTA)) |
| .untilAsserted(() -> { |
| // Messages on first 2 ledgers should be expired, backlog is number of |
| // message in current ledger which should be 4. |
| long msgBacklog = admin.topics().getStats(topic1).getSubscriptions().get(subName2).getMsgBacklog(); |
| // TODO: for some reason the backlog size is sometimes off by one |
| // Internally there's a method `long getNumberOfEntriesInBacklog(boolean getPreciseBacklog)` |
| // on org.apache.pulsar.broker.service.Subscription interface |
| // the `boolean getPreciseBacklog` parameter indicates that the backlog size isn't accurate |
| assertEquals(msgBacklog, 4, 1); |
| }); |
| } |
| |
| @Test |
| public void testConcurrentAckAndEviction() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build()); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic12"; |
| final String subName1 = "c12"; |
| final String subName2 = "c22"; |
| final int numMsgs = 20; |
| |
| final CyclicBarrier barrier = new CyclicBarrier(2); |
| final CountDownLatch counter = new CountDownLatch(2); |
| final AtomicBoolean gotException = new AtomicBoolean(false); |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| @Cleanup |
| PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| Consumer<byte[]> consumer1 = client2.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client2.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| |
| Thread producerThread = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| producer.close(); |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| Thread consumerThread = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| for (int i = 0; i < numMsgs; i++) { |
| // only one consumer acknowledges the message |
| consumer1.acknowledge(consumer1.receive()); |
| consumer2.receive(); |
| } |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| producerThread.start(); |
| consumerThread.start(); |
| |
| // test hangs without timeout since there is nothing to consume due to eviction |
| counter.await(20, TimeUnit.SECONDS); |
| assertFalse(gotException.get()); |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertTrue(stats.getBacklogSize() <= 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); |
| } |
| |
| @Test |
| public void testNoEviction() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build()); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic13"; |
| final String subName1 = "c13"; |
| final String subName2 = "c23"; |
| final int numMsgs = 10; |
| |
| final CyclicBarrier barrier = new CyclicBarrier(2); |
| final CountDownLatch counter = new CountDownLatch(2); |
| final AtomicBoolean gotException = new AtomicBoolean(false); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| |
| final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| @Cleanup |
| final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| |
| Thread producerThread = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client2, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| producer.close(); |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| Thread consumerThread = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| for (int i = 0; i < numMsgs; i++) { |
| consumer1.acknowledge(consumer1.receive()); |
| consumer2.acknowledge(consumer2.receive()); |
| } |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| producerThread.start(); |
| consumerThread.start(); |
| counter.await(); |
| assertFalse(gotException.get()); |
| } |
| |
| @Test |
| public void testEvictionMulti() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/ns-quota", |
| BacklogQuota.builder() |
| .limitSize(15 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) |
| .build()); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic14"; |
| final String subName1 = "c14"; |
| final String subName2 = "c24"; |
| final int numMsgs = 10; |
| |
| final CyclicBarrier barrier = new CyclicBarrier(4); |
| final CountDownLatch counter = new CountDownLatch(4); |
| final AtomicBoolean gotException = new AtomicBoolean(false); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| |
| final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| @Cleanup |
| final PulsarClient client3 = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| @Cleanup |
| final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| |
| Thread producerThread1 = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client2, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| producer.close(); |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| Thread producerThread2 = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client3, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| producer.close(); |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| Thread consumerThread1 = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| for (int i = 0; i < numMsgs * 2; i++) { |
| consumer1.acknowledge(consumer1.receive()); |
| } |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| Thread consumerThread2 = new Thread() { |
| public void run() { |
| try { |
| barrier.await(); |
| for (int i = 0; i < numMsgs * 2; i++) { |
| consumer2.acknowledge(consumer2.receive()); |
| } |
| } catch (Exception e) { |
| gotException.set(true); |
| } finally { |
| counter.countDown(); |
| } |
| } |
| }; |
| |
| producerThread1.start(); |
| producerThread2.start(); |
| consumerThread1.start(); |
| consumerThread2.start(); |
| counter.await(20, TimeUnit.SECONDS); |
| assertFalse(gotException.get()); |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertTrue(stats.getBacklogSize() <= 15 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); |
| } |
| |
| @Test |
| public void testAheadProducerOnHold() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold) |
| .build()); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/hold"; |
| final String subName1 = "c1hold"; |
| final int numMsgs = 10; |
| |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i <= numMsgs; i++) { |
| try { |
| producer.send(content); |
| LOG.info("sent [{}]", i); |
| } catch (PulsarClientException.TimeoutException cte) { |
| // producer close may cause a timeout on send |
| LOG.info("timeout on [{}]", i); |
| } |
| } |
| |
| for (int i = 0; i < numMsgs; i++) { |
| consumer.receive(); |
| LOG.info("received [{}]", i); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getPublishers().size(), 0, |
| "Number of producers on topic " + topic1 + " are [" + stats.getPublishers().size() + "]"); |
| } |
| |
| @Test |
| public void testAheadProducerOnHoldTimeout() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold) |
| .build()); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/holdtimeout"; |
| final String subName1 = "c1holdtimeout"; |
| boolean gotException = false; |
| |
| client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i < 10; i++) { |
| producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| producer.send(content); |
| producer.send(content); |
| fail("backlog quota did not exceed"); |
| } catch (PulsarClientException.TimeoutException te) { |
| gotException = true; |
| } |
| |
| assertTrue(gotException, "timeout did not occur"); |
| } |
| |
| @Test |
| public void testProducerException() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build()); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/except"; |
| final String subName1 = "c1except"; |
| boolean gotException = false; |
| |
| client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i < 10; i++) { |
| producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| producer.send(content); |
| producer.send(content); |
| fail("backlog quota did not exceed"); |
| } catch (PulsarClientException ce) { |
| assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException |
| || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); |
| gotException = true; |
| } |
| |
| assertTrue(gotException, "backlog exceeded exception did not occur"); |
| } |
| |
| @Test |
| public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitSize(10 * 1024) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build()); |
| @Cleanup |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/exceptandunblock"; |
| final String subName1 = "c1except"; |
| boolean gotException = false; |
| |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i < 10; i++) { |
| producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| producer.send(content); |
| producer.send(content); |
| fail("backlog quota did not exceed"); |
| } catch (PulsarClientException ce) { |
| assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException |
| || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); |
| gotException = true; |
| } |
| |
| assertTrue(gotException, "backlog exceeded exception did not occur"); |
| // now remove backlog and ensure that producer is unblocked; |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| int backlog = (int) stats.getSubscriptions().get(subName1).getMsgBacklog(); |
| |
| for (int i = 0; i < backlog; i++) { |
| Message<?> msg = consumer.receive(); |
| consumer.acknowledge(msg); |
| } |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| // publish should work now |
| Exception sendException = null; |
| gotException = false; |
| try { |
| for (int i = 0; i < 5; i++) { |
| producer.send(content); |
| } |
| } catch (Exception e) { |
| gotException = true; |
| sendException = e; |
| } |
| assertFalse(gotException, "unable to publish due to " + sendException); |
| } |
| |
| @Test |
| public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| config.setPreciseTimeBasedBacklogQuotaCheck(true); |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; |
| final String subName1 = "c1except"; |
| boolean gotException = false; |
| int numMsgs = 9; |
| |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| producer.send(content); |
| fail("backlog quota did not exceed"); |
| } catch (PulsarClientException ce) { |
| assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException |
| || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); |
| gotException = true; |
| } |
| |
| assertTrue(gotException, "backlog exceeded exception did not occur"); |
| |
| // now remove backlog and ensure that producer is unblocked; |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), numMsgs); |
| |
| for (int i = 0; i < numMsgs; i++) { |
| consumer.acknowledge(consumer.receive()); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| rolloverStats(); |
| stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); |
| // publish should work now |
| Exception sendException = null; |
| gotException = false; |
| try { |
| for (int i = 0; i < 5; i++) { |
| producer.send(content); |
| } |
| } catch (Exception e) { |
| gotException = true; |
| sendException = e; |
| } |
| assertFalse(gotException, "unable to publish due to " + sendException); |
| client.close(); |
| } |
| |
| @Test |
| public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { |
| assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), |
| Maps.newHashMap()); |
| admin.namespaces().setBacklogQuota("prop/quotahold", |
| BacklogQuota.builder() |
| .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) |
| .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) |
| .build(), BacklogQuota.BacklogQuotaType.message_age); |
| final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) |
| .statsInterval(0, TimeUnit.SECONDS).build(); |
| final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; |
| final String subName1 = "c1except"; |
| boolean gotException = false; |
| int numMsgs = 14; |
| |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| |
| byte[] content = new byte[1024]; |
| Producer<byte[]> producer = createProducer(client, topic1); |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| |
| try { |
| // try to send over backlog quota and make sure it fails |
| producer.send(content); |
| fail("backlog quota did not exceed"); |
| } catch (PulsarClientException ce) { |
| assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException |
| || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); |
| gotException = true; |
| } |
| |
| assertTrue(gotException, "backlog exceeded exception did not occur"); |
| |
| // now remove backlog and ensure that producer is unblocked; |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), numMsgs); |
| |
| for (int i = 0; i < numMsgs; i++) { |
| consumer.acknowledge(consumer.receive()); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); |
| rolloverStats(); |
| stats = admin.topics().getStats(topic1); |
| assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); |
| // publish should work now |
| Exception sendException = null; |
| gotException = false; |
| try { |
| for (int i = 0; i < 5; i++) { |
| producer.send(content); |
| } |
| } catch (Exception e) { |
| gotException = true; |
| sendException = e; |
| } |
| assertFalse(gotException, "unable to publish due to " + sendException); |
| client.close(); |
| } |
| |
| @Test(dataProvider = "backlogQuotaSizeGB") |
| public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception { |
| |
| pulsar.close(); |
| long backlogQuotaByte = 10 * 1024; |
| if (backlogQuotaSizeGB) { |
| config.setBacklogQuotaDefaultLimitGB(((double) backlogQuotaByte) / BacklogQuotaImpl.BYTES_IN_GIGABYTE); |
| } else { |
| config.setBacklogQuotaDefaultLimitBytes(backlogQuotaByte); |
| } |
| config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| pulsar = new PulsarService(config); |
| pulsar.start(); |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| |
| final String topic1 = "persistent://prop/ns-quota/topic2"; |
| final String subName1 = "c1"; |
| final String subName2 = "c2"; |
| final int numMsgs = 20; |
| |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); |
| org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); |
| byte[] content = new byte[1024]; |
| for (int i = 0; i < numMsgs; i++) { |
| producer.send(content); |
| consumer1.receive(); |
| consumer2.receive(); |
| } |
| |
| Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
| rolloverStats(); |
| |
| TopicStats stats = admin.topics().getStats(topic1); |
| assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); |
| } |
| private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class); |
| } |