| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.compaction; |
| |
| import static org.mockito.Mockito.anyLong; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertSame; |
| import static org.testng.Assert.assertTrue; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import io.netty.buffer.ByteBuf; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import lombok.Cleanup; |
| import lombok.SneakyThrows; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.api.OpenBuilder; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerInfo; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.commons.lang3.reflect.FieldUtils; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.broker.BrokerTestUtil; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.namespace.NamespaceService; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; |
| import org.apache.pulsar.broker.service.persistent.PersistentSubscription; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.service.persistent.SystemTopic; |
| import org.apache.pulsar.client.admin.LongRunningProcessStatus; |
| import org.apache.pulsar.client.api.CompressionType; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.CryptoKeyReader; |
| import org.apache.pulsar.client.api.EncryptionKeyInfo; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.MessageRoutingMode; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProducerBuilder; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Reader; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.client.impl.BatchMessageIdImpl; |
| import org.apache.pulsar.client.impl.ConsumerImpl; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.protocol.Markers; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.awaitility.Awaitility; |
| import org.mockito.Mockito; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "flaky") |
| @Slf4j |
| public class CompactionTest extends MockedPulsarServiceBaseTest { |
| private ScheduledExecutorService compactionScheduler; |
| private BookKeeper bk; |
| |
| @BeforeMethod |
| @Override |
| public void setup() throws Exception { |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); |
| admin.tenants().createTenant("my-property", |
| new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); |
| admin.namespaces().createNamespace("my-property/use/my-ns"); |
| |
| compactionScheduler = Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); |
| bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| public void cleanup() throws Exception { |
| super.internalCleanup(); |
| |
| if (compactionScheduler != null) { |
| compactionScheduler.shutdownNow(); |
| } |
| } |
| |
| @Test |
| public void testCompaction() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| final int numMessages = 20; |
| final int maxKeys = 10; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| Map<String, byte[]> expected = new HashMap<>(); |
| List<Pair<String, byte[]>> all = new ArrayList<>(); |
| Random r = new Random(0); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| for (int j = 0; j < numMessages; j++) { |
| int keyIndex = r.nextInt(maxKeys); |
| String key = "key" + keyIndex; |
| byte[] data = ("my-message-" + key + "-" + j).getBytes(); |
| producer.newMessage().key(key).value(data).send(); |
| expected.put(key, data); |
| all.add(Pair.of(key, data)); |
| } |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false); |
| // Compacted topic ledger should have same number of entry equals to number of unique key. |
| Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries); |
| Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); |
| Assert.assertFalse(internalStats.compactedLedger.offloaded); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| while (true) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| Assert.assertEquals(expected.remove(m.getKey()), m.getData()); |
| if (expected.isEmpty()) { |
| break; |
| } |
| } |
| Assert.assertTrue(expected.isEmpty()); |
| } |
| |
| // can get full backlog if read compacted disabled |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(false).subscribe()) { |
| while (true) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| Pair<String, byte[]> expectedMessage = all.remove(0); |
| Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); |
| Assert.assertEquals(expectedMessage.getRight(), m.getData()); |
| if (all.isEmpty()) { |
| break; |
| } |
| } |
| Assert.assertTrue(all.isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testCompactionWithReader() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| final int numMessages = 20; |
| final int maxKeys = 10; |
| |
| // Configure retention to ensue data is retained for reader |
| admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1)); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| Map<String, String> expected = new HashMap<>(); |
| List<Pair<String, String>> all = new ArrayList<>(); |
| Random r = new Random(0); |
| |
| for (int j = 0; j < numMessages; j++) { |
| int keyIndex = r.nextInt(maxKeys); |
| String key = "key" + keyIndex; |
| String value = "my-message-" + key + "-" + j; |
| producer.newMessage().key(key).value(value.getBytes()).send(); |
| expected.put(key, value); |
| all.add(Pair.of(key, value)); |
| } |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true) |
| .startMessageId(MessageId.earliest).create()) { |
| while (true) { |
| Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS); |
| Assert.assertEquals(expected.remove(m.getKey()), new String(m.getData())); |
| if (expected.isEmpty()) { |
| break; |
| } |
| } |
| Assert.assertTrue(expected.isEmpty()); |
| } |
| |
| // can get full backlog if read compacted disabled |
| try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(false) |
| .startMessageId(MessageId.earliest).create()) { |
| while (true) { |
| Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS); |
| Pair<String, String> expectedMessage = all.remove(0); |
| Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); |
| Assert.assertEquals(expectedMessage.getRight(), new String(m.getData())); |
| if (all.isEmpty()) { |
| break; |
| } |
| } |
| Assert.assertTrue(all.isEmpty()); |
| } |
| } |
| |
| |
| @Test |
| public void testReadCompactedBeforeCompaction() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| producer.newMessage().key("key0").value("content0".getBytes()).send(); |
| producer.newMessage().key("key0").value("content1".getBytes()).send(); |
| producer.newMessage().key("key0").value("content2".getBytes()).send(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content0".getBytes()); |
| |
| m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content1".getBytes()); |
| |
| m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| } |
| |
| @Test |
| public void testReadEntriesAfterCompaction() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| producer.newMessage().key("key0").value("content0".getBytes()).send(); |
| producer.newMessage().key("key0").value("content1".getBytes()).send(); |
| producer.newMessage().key("key0").value("content2".getBytes()).send(); |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| producer.newMessage().key("key0").value("content3".getBytes()).send(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| |
| m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content3".getBytes()); |
| } |
| } |
| |
| @Test |
| public void testSeekEarliestAfterCompaction() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| producer.newMessage().key("key0").value("content0".getBytes()).send(); |
| producer.newMessage().key("key0").value("content1".getBytes()).send(); |
| producer.newMessage().key("key0").value("content2".getBytes()).send(); |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| consumer.seek(MessageId.earliest); |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(false).subscribe()) { |
| consumer.seek(MessageId.earliest); |
| |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content0".getBytes()); |
| |
| m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content1".getBytes()); |
| |
| m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| } |
| |
| @Test |
| public void testBrokerRestartAfterCompaction() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| producer.newMessage().key("key0").value("content0".getBytes()).send(); |
| producer.newMessage().key("key0").value("content1".getBytes()).send(); |
| producer.newMessage().key("key0").value("content2".getBytes()).send(); |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| |
| stopBroker(); |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| consumer.receive(); |
| Assert.fail("Shouldn't have been able to receive anything"); |
| } catch (PulsarClientException e) { |
| // correct behaviour |
| } |
| startBroker(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content2".getBytes()); |
| } |
| } |
| |
| @Test |
| public void testCompactEmptyTopic() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| |
| producer.newMessage().key("key0").value("content0".getBytes()).send(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(); |
| Assert.assertEquals(m.getKey(), "key0"); |
| Assert.assertEquals(m.getData(), "content0".getBytes()); |
| } |
| } |
| |
| @Test |
| public void testFirstMessageRetained() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create()) { |
| producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); |
| producer.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync(); |
| producer.newMessage().key("key2").value("my-message-3".getBytes()).send(); |
| } |
| |
| // Read messages before compaction to get ids |
| List<Message<byte[]>> messages = new ArrayList<>(); |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()) { |
| messages.add(consumer.receive()); |
| messages.add(consumer.receive()); |
| messages.add(consumer.receive()); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // Check that messages after compaction have same ids |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId()); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId()); |
| } |
| } |
| |
| @Test |
| public void testBatchMessageIdsDontChange() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create() |
| ) { |
| producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); |
| producer.newMessage().key("key2").value("my-message-2".getBytes()).sendAsync(); |
| producer.newMessage().key("key2").value("my-message-3".getBytes()).send(); |
| } |
| |
| // Read messages before compaction to get ids |
| List<Message<byte[]>> messages = new ArrayList<>(); |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()) { |
| messages.add(consumer.receive()); |
| messages.add(consumer.receive()); |
| messages.add(consumer.receive()); |
| } |
| |
| // Ensure all messages are in same batch |
| Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(), |
| ((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId()); |
| Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(), |
| ((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId()); |
| Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(), |
| ((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId()); |
| Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(), |
| ((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId()); |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // Check that messages after compaction have same ids |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId()); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId()); |
| } |
| } |
| |
| @Test |
| public void testBatchMessageWithNullValue() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .receiverQueueSize(1).readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create() |
| ) { |
| // batch 1 |
| producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); |
| producer.newMessage().key("key1").value(null).sendAsync(); |
| producer.newMessage().key("key2").value("my-message-3".getBytes()).send(); |
| |
| // batch 2 |
| producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync(); |
| producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync(); |
| producer.newMessage().key("key3").value("my-message-6".getBytes()).send(); |
| |
| // batch 3 |
| producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync(); |
| producer.newMessage().key("key4").value(null).sendAsync(); |
| producer.newMessage().key("key5").value("my-message-9".getBytes()).send(); |
| } |
| |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // Read messages before compaction to get ids |
| List<Message<byte[]>> messages = new ArrayList<>(); |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) { |
| while (true) { |
| Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); |
| if (message == null) { |
| break; |
| } |
| messages.add(message); |
| } |
| } |
| |
| assertEquals(messages.size(), 3); |
| assertEquals(messages.get(0).getKey(), "key2"); |
| assertEquals(messages.get(1).getKey(), "key3"); |
| assertEquals(messages.get(2).getKey(), "key5"); |
| } |
| |
| @Test |
| public void testWholeBatchCompactedOut() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| Producer<byte[]> producerBatch = pulsarClient.newProducer().topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create()) { |
| producerBatch.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); |
| producerBatch.newMessage().key("key1").value("my-message-2".getBytes()).sendAsync(); |
| producerBatch.newMessage().key("key1").value("my-message-3".getBytes()).sendAsync(); |
| producerNormal.newMessage().key("key1").value("my-message-4".getBytes()).send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message = consumer.receive(); |
| Assert.assertEquals(message.getKey(), "key1"); |
| Assert.assertEquals(new String(message.getData()), "my-message-4"); |
| } |
| } |
| |
| @DataProvider(name = "retainNullKey") |
| public static Object[][] retainNullKey() { |
| return new Object[][] {{true}, {false}}; |
| } |
| |
| @Test(dataProvider = "retainNullKey") |
| public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception { |
| conf.setTopicCompactionRetainNullKey(retainNullKey); |
| restartBroker(); |
| |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create(); |
| Producer<byte[]> producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) |
| .enableBatching(true).batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { |
| producerNormal.newMessage().value("my-message-1".getBytes()).send(); |
| |
| producerBatch.newMessage().value("my-message-2".getBytes()).sendAsync(); |
| producerBatch.newMessage().key("key1").value("my-message-3".getBytes()).sendAsync(); |
| producerBatch.newMessage().key("key1").value("my-message-4".getBytes()).send(); |
| |
| producerBatch.newMessage().key("key2").value("my-message-5".getBytes()).sendAsync(); |
| producerBatch.newMessage().key("key2").value("my-message-6".getBytes()).sendAsync(); |
| producerBatch.newMessage().value("my-message-7".getBytes()).send(); |
| |
| producerNormal.newMessage().value("my-message-8".getBytes()).send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| List<Pair<String, String>> result = new ArrayList<>(); |
| while (true) { |
| Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS); |
| if (message == null) { |
| break; |
| } |
| result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData()))); |
| } |
| |
| List<Pair<String, String>> expectList; |
| if (retainNullKey) { |
| expectList = List.of( |
| Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"), |
| Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"), |
| Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8")); |
| } else { |
| expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6")); |
| } |
| Assert.assertEquals(result, expectList); |
| } |
| } |
| |
| |
| @Test |
| public void testEmptyPayloadDeletes() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| Producer<byte[]> producerBatch = pulsarClient.newProducer() |
| .topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .create()) { |
| |
| // key0 persists through it all |
| producerNormal.newMessage() |
| .key("key0") |
| .value("my-message-0".getBytes()) |
| .send(); |
| |
| // key1 is added but then deleted |
| producerNormal.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .send(); |
| |
| producerNormal.newMessage() |
| .key("key1") |
| .send(); |
| |
| // key2 is added but deleted in same batch |
| producerBatch.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key3") |
| .value("my-message-3".getBytes()) |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key2").send(); |
| |
| // key3 is added in previous batch, deleted in this batch |
| producerBatch.newMessage() |
| .key("key3") |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key4") |
| .value("my-message-3".getBytes()) |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key4") |
| .send(); |
| |
| // key4 is added, deleted, then resurrected |
| producerNormal.newMessage() |
| .key("key4") |
| .value("my-message-4".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key0"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-0"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key4"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-4"); |
| } |
| } |
| |
| @Test |
| public void testEmptyPayloadDeletesWhenCompressed() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .compressionType(CompressionType.LZ4) |
| .create(); |
| Producer<byte[]> producerBatch = pulsarClient.newProducer() |
| .topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .compressionType(CompressionType.LZ4) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .create()) { |
| |
| // key0 persists through it all |
| producerNormal.newMessage() |
| .key("key0") |
| .value("my-message-0".getBytes()).send(); |
| |
| // key1 is added but then deleted |
| producerNormal.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()).send(); |
| |
| producerNormal.newMessage() |
| .key("key1").send(); |
| |
| // key2 is added but deleted in same batch |
| producerBatch.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()).sendAsync(); |
| producerBatch.newMessage() |
| .key("key3") |
| .value("my-message-3".getBytes()).sendAsync(); |
| producerBatch.newMessage() |
| .key("key2").send(); |
| |
| // key3 is added in previous batch, deleted in this batch |
| producerBatch.newMessage() |
| .key("key3") |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key4") |
| .value("my-message-3".getBytes()) |
| .sendAsync(); |
| producerBatch.newMessage() |
| .key("key4") |
| .send(); |
| |
| // key4 is added, deleted, then resurrected |
| producerNormal.newMessage() |
| .key("key4") |
| .value("my-message-4".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key0"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-0"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key4"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-4"); |
| } |
| } |
| |
| // test compact no keys |
| |
| @Test |
| public void testCompactorReadsCompacted() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // capture opened ledgers |
| Set<Long> ledgersOpened = Sets.newConcurrentHashSet(); |
| when(mockBookKeeper.newOpenLedgerOp()).thenAnswer( |
| (invocation) -> { |
| OpenBuilder builder = (OpenBuilder)spy(invocation.callRealMethod()); |
| when(builder.withLedgerId(anyLong())).thenAnswer( |
| (invocation2) -> { |
| ledgersOpened.add((Long)invocation2.getArguments()[0]); |
| return invocation2.callRealMethod(); |
| }); |
| return builder; |
| }); |
| |
| // subscribe before sending anything, so that we get all messages in sub1 |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); |
| |
| // create the topic on the broker |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create()) { |
| producerNormal.newMessage() |
| .key("key0") |
| .value("my-message-0".getBytes()) |
| .send(); |
| } |
| |
| // force ledger roll |
| pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); |
| |
| // write a message to avoid issue #1517 |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create()) { |
| producerNormal.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .send(); |
| } |
| |
| // verify second ledger created |
| String managedLedgerName = ((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get()) |
| .getManagedLedger().getName(); |
| ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); |
| Assert.assertEquals(info.ledgers.size(), 2); |
| Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // should have opened all except last to read |
| Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); |
| ledgersOpened.clear(); |
| |
| // force broker to close resources for topic |
| pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); |
| |
| // write a message to avoid issue #1517 |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create()) { |
| producerNormal.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .send(); |
| } |
| |
| info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName); |
| Assert.assertEquals(info.ledgers.size(), 3); |
| |
| // should only have opened the penultimate ledger to get stat |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); |
| ledgersOpened.clear(); |
| |
| // compact the topic again |
| compactor.compact(topic).get(); |
| |
| // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data. |
| // last ledger already open for writing |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); |
| Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); |
| Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); |
| |
| // all three messages should be there when we read compacted |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key0"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-0"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key1"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-1"); |
| |
| Message<byte[]> message3 = consumer.receive(); |
| Assert.assertEquals(message3.getKey(), "key2"); |
| Assert.assertEquals(new String(message3.getData()), "my-message-2"); |
| } |
| } |
| |
| @Test |
| public void testCompactCompressedNoBatch() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .compressionType(CompressionType.LZ4).enableBatching(false).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| } |
| } |
| |
| @Test |
| public void testCompactCompressedBatching() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .compressionType(CompressionType.LZ4) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| } |
| } |
| |
| class EncKeyReader implements CryptoKeyReader { |
| EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); |
| |
| @Override |
| public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| |
| @Override |
| public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { |
| String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; |
| if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { |
| try { |
| keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); |
| return keyInfo; |
| } catch (IOException e) { |
| Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); |
| } |
| } else { |
| Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); |
| } |
| return null; |
| } |
| } |
| |
| @Test |
| public void testCompactEncryptedNoBatch() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .enableBatching(false).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // Check that messages after compaction have same ids |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()) |
| .readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| } |
| } |
| |
| @Test |
| public void testCompactEncryptedBatching() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // with encryption, all messages are passed through compaction as it doesn't |
| // have the keys to decrypt the batch payload |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()) |
| .readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-2"); |
| |
| Message<byte[]> message3 = consumer.receive(); |
| Assert.assertEquals(message3.getKey(), "key2"); |
| Assert.assertEquals(new String(message3.getData()), "my-message-3"); |
| } |
| } |
| |
| @Test |
| public void testCompactEncryptedAndCompressedNoBatch() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .compressionType(CompressionType.LZ4) |
| .enableBatching(false).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // Check that messages after compaction have same ids |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()) |
| .readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-3"); |
| } |
| } |
| |
| @Test |
| public void testCompactEncryptedAndCompressedBatching() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .compressionType(CompressionType.LZ4) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { |
| producer.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()) |
| .sendAsync(); |
| producer.newMessage() |
| .key("key2") |
| .value("my-message-3".getBytes()) |
| .send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // with encryption, all messages are passed through compaction as it doesn't |
| // have the keys to decrypt the batch payload |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader()) |
| .readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key1"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-1"); |
| |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-2"); |
| |
| Message<byte[]> message3 = consumer.receive(); |
| Assert.assertEquals(message3.getKey(), "key2"); |
| Assert.assertEquals(new String(message3.getData()), "my-message-3"); |
| } |
| } |
| |
| @Test |
| public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| // subscribe before sending anything, so that we get all messages |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe().close(); |
| |
| try (Producer<byte[]> producerNormal = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .create(); |
| Producer<byte[]> producerBatch = pulsarClient.newProducer() |
| .topic(topic) |
| .maxPendingMessages(3) |
| .enableBatching(true) |
| .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()) |
| .batchingMaxMessages(3) |
| .batchingMaxPublishDelay(1, TimeUnit.HOURS) |
| .create()) { |
| |
| // key0 persists through it all |
| producerNormal.newMessage() |
| .key("key0") |
| .value("my-message-0".getBytes()).send(); |
| |
| // key1 is added but then deleted |
| producerNormal.newMessage() |
| .key("key1") |
| .value("my-message-1".getBytes()).send(); |
| |
| producerNormal.newMessage() |
| .key("key1") |
| .send(); |
| |
| // key2 is added but deleted in same batch |
| producerBatch.newMessage() |
| .key("key2") |
| .value("my-message-2".getBytes()).sendAsync(); |
| producerBatch.newMessage() |
| .key("key3") |
| .value("my-message-3".getBytes()).sendAsync(); |
| producerBatch.newMessage() |
| .key("key2").send(); |
| |
| // key4 is added, deleted, then resurrected |
| producerNormal.newMessage() |
| .key("key4") |
| .value("my-message-4".getBytes()).send(); |
| } |
| |
| // compact the topic |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) |
| .cryptoKeyReader(new EncKeyReader()) |
| .subscriptionName("sub1").readCompacted(true).subscribe()){ |
| Message<byte[]> message1 = consumer.receive(); |
| Assert.assertEquals(message1.getKey(), "key0"); |
| Assert.assertEquals(new String(message1.getData()), "my-message-0"); |
| |
| // see all messages from batch |
| Message<byte[]> message2 = consumer.receive(); |
| Assert.assertEquals(message2.getKey(), "key2"); |
| Assert.assertEquals(new String(message2.getData()), "my-message-2"); |
| |
| Message<byte[]> message3 = consumer.receive(); |
| Assert.assertEquals(message3.getKey(), "key3"); |
| Assert.assertEquals(new String(message3.getData()), "my-message-3"); |
| |
| Message<byte[]> message4 = consumer.receive(); |
| Assert.assertEquals(message4.getKey(), "key2"); |
| Assert.assertEquals(new String(message4.getData()), ""); |
| |
| Message<byte[]> message5 = consumer.receive(); |
| Assert.assertEquals(message5.getKey(), "key4"); |
| Assert.assertEquals(new String(message5.getData()), "my-message-4"); |
| } |
| } |
| |
| @DataProvider(name = "lastDeletedBatching") |
| public static Object[][] lastDeletedBatching() { |
| return new Object[][] {{true}, {false}}; |
| } |
| |
| @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") |
| public void testCompactionWithLastDeletedKey(boolean batching) throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| producer.newMessage().key("1").value("1".getBytes()).send(); |
| producer.newMessage().key("2").value("2".getBytes()).send(); |
| producer.newMessage().key("3").value("3".getBytes()).send(); |
| producer.newMessage().key("1").value("".getBytes()).send(); |
| producer.newMessage().key("2").value("".getBytes()).send(); |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| Set<String> expected = Sets.newHashSet("3"); |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| assertTrue(expected.remove(m.getKey())); |
| } |
| } |
| |
| @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") |
| public void testEmptyCompactionLedger(boolean batching) throws Exception { |
| String topic = "persistent://my-property/use/my-ns/my-topic1"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| producer.newMessage().key("1").value("1".getBytes()).send(); |
| producer.newMessage().key("2").value("2".getBytes()).send(); |
| producer.newMessage().key("1").value("".getBytes()).send(); |
| producer.newMessage().key("2").value("".getBytes()).send(); |
| |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(m); |
| } |
| } |
| |
| @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") |
| public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception { |
| final String topic = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString(); |
| |
| final int messages = 10; |
| |
| // 1.create producer and publish message to the topic. |
| ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic); |
| if (!batchEnabled) { |
| builder.enableBatching(false); |
| } else { |
| builder.batchingMaxMessages(messages / 5); |
| } |
| |
| Producer<byte[]> producer = builder.create(); |
| |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages); |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().keyBytes("1".getBytes()).value("".getBytes()).sendAsync()); |
| } |
| |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(m); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testBatchAndNonBatchWithoutEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException { |
| final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithoutEmptyPayload" + UUID.randomUUID().toString(); |
| |
| // 1.create producer and publish message to the topic. |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(true) |
| .batchingMaxPublishDelay(1, TimeUnit.DAYS) |
| .create(); |
| |
| final String k1 = "k1"; |
| final String k2 = "k2"; |
| producer.newMessage().key(k1).value("0".getBytes()).send(); |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(7); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| producer.newMessage().key(k1).value("3".getBytes()).send(); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| |
| for (int i = 0; i < 3; i++) { |
| futures.add(producer.newMessage().key(k2).value((i + "").getBytes()).sendAsync()); |
| } |
| |
| producer.newMessage().key(k2).value("3".getBytes()).send(); |
| producer.flush(); |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { |
| Message<byte[]> m1 = consumer.receive(2, TimeUnit.SECONDS); |
| Message<byte[]> m2 = consumer.receive(2, TimeUnit.SECONDS); |
| assertNotNull(m1); |
| assertNotNull(m2); |
| assertEquals(m1.getKey(), k1); |
| assertEquals(new String(m1.getValue()), "5"); |
| assertEquals(m2.getKey(), k2); |
| assertEquals(new String(m2.getValue()), "3"); |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| } |
| @Test(timeOut = 20000) |
| public void testBatchAndNonBatchWithEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException { |
| final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString(); |
| |
| // 1.create producer and publish message to the topic. |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(true) |
| .batchingMaxPublishDelay(1, TimeUnit.DAYS) |
| .create(); |
| |
| final String k1 = "k1"; |
| final String k2 = "k2"; |
| final String k3 = "k3"; |
| producer.newMessage().key(k1).value("0".getBytes()).send(); |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(7); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| producer.newMessage().key(k1).value("3".getBytes()).send(); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| |
| for (int i = 0; i < 3; i++) { |
| futures.add(producer.newMessage().key(k2).value((i + 10 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| |
| producer.newMessage().key(k2).value("".getBytes()).send(); |
| |
| producer.newMessage().key(k3).value("0".getBytes()).send(); |
| |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { |
| Message<byte[]> m1 = consumer.receive(); |
| Message<byte[]> m2 = consumer.receive(); |
| assertNotNull(m1); |
| assertNotNull(m2); |
| assertEquals(m1.getKey(), k1); |
| assertEquals(m2.getKey(), k3); |
| assertEquals(new String(m1.getValue()), "5"); |
| assertEquals(new String(m2.getValue()), "0"); |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testBatchAndNonBatchEndOfEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException { |
| final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString(); |
| |
| // 1.create producer and publish message to the topic. |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(true) |
| .batchingMaxPublishDelay(1, TimeUnit.DAYS) |
| .create(); |
| |
| final String k1 = "k1"; |
| final String k2 = "k2"; |
| producer.newMessage().key(k1).value("0".getBytes()).send(); |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(7); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| producer.newMessage().key(k1).value("3".getBytes()).send(); |
| for (int i = 0; i < 2; i++) { |
| futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| |
| for (int i = 0; i < 3; i++) { |
| futures.add(producer.newMessage().key(k2).value((i + 10 + "").getBytes()).sendAsync()); |
| } |
| producer.flush(); |
| |
| producer.newMessage().key(k2).value("".getBytes()).send(); |
| |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { |
| Message<byte[]> m1 = consumer.receive(); |
| assertNotNull(m1); |
| assertEquals(m1.getKey(), k1); |
| assertEquals(new String(m1.getValue()), "5"); |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| } |
| |
| @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") |
| public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException { |
| final String topic = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString(); |
| |
| final int messages = 10; |
| final String key = "1"; |
| |
| // 1.create producer and publish message to the topic. |
| ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic); |
| if (!batchEnabled) { |
| builder.enableBatching(false); |
| } else { |
| builder.batchingMaxMessages(messages / 5); |
| } |
| |
| Producer<byte[]> producer = builder.create(); |
| |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages); |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync()); |
| } |
| |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // 3. Send more ten messages |
| futures.clear(); |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync()); |
| } |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 4.compact again. |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { |
| Message<byte[]> m1 = consumer.receive(); |
| assertNotNull(m1); |
| assertEquals(m1.getKey(), key); |
| assertEquals(new String(m1.getValue()), "19"); |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| } |
| |
| @Test(timeOut = 2000000, dataProvider = "lastDeletedBatching") |
| public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException { |
| final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString(); |
| |
| final int messages = 10; |
| final String key = "1"; |
| |
| // 1.create producer and publish message to the topic. |
| ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic); |
| if (!batchEnabled) { |
| builder.enableBatching(false); |
| } else { |
| builder.batchingMaxMessages(messages / 5); |
| } |
| |
| Producer<byte[]> producer = builder.create(); |
| |
| List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages); |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync()); |
| } |
| |
| FutureUtil.waitForAll(futures).get(); |
| |
| // 2.compact the topic. |
| Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); |
| compactor.compact(topic).get(); |
| |
| // 3. Send more ten messages |
| futures.clear(); |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync()); |
| } |
| FutureUtil.waitForAll(futures).get(); |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer() |
| .topic(topic) |
| .subscriptionName("sub1") |
| .readCompacted(true) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe()) { |
| for (int i = 0; i < 11; i++) { |
| Message<byte[]> received = consumer.receive(); |
| assertNotNull(received); |
| assertEquals(received.getKey(), key); |
| assertEquals(new String(received.getValue()), i + 9 + ""); |
| consumer.acknowledge(received); |
| } |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| |
| // 4.Send empty message to delete the key-value in the compacted topic. |
| producer.newMessage().key(key).value(("").getBytes()).send(); |
| |
| // 5.compact the topic. |
| compactor.compact(topic).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer() |
| .topic(topic) |
| .subscriptionName("sub2") |
| .readCompacted(true) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe()) { |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| |
| for (int i = 0; i < messages; i++) { |
| futures.add(producer.newMessage().key(key).value((i + 20 + "").getBytes()).sendAsync()); |
| } |
| FutureUtil.waitForAll(futures).get(); |
| |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer() |
| .topic(topic) |
| .subscriptionName("sub3") |
| .readCompacted(true) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe()) { |
| for (int i = 0; i < 10; i++) { |
| Message<byte[]> received = consumer.receive(); |
| assertNotNull(received); |
| assertEquals(received.getKey(), key); |
| assertEquals(new String(received.getValue()), i + 20 + ""); |
| consumer.acknowledge(received); |
| } |
| Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS); |
| assertNull(none); |
| } |
| } |
| |
| @SneakyThrows |
| @Test |
| public void testHealthCheckTopicNotCompacted() { |
| NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); |
| String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"; |
| NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); |
| String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck"; |
| Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicV1).create(); |
| Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicV2).create(); |
| Optional<Topic> topicReferenceV1 = pulsar.getBrokerService().getTopic(topicV1, false).join(); |
| Optional<Topic> topicReferenceV2 = pulsar.getBrokerService().getTopic(topicV2, false).join(); |
| assertFalse(((SystemTopic)topicReferenceV1.get()).isCompactionEnabled()); |
| assertFalse(((SystemTopic)topicReferenceV2.get()).isCompactionEnabled()); |
| producer1.close(); |
| producer2.close(); |
| } |
| |
| @Test(timeOut = 60000) |
| public void testCompactionWithMarker() throws Exception { |
| String namespace = "my-property/use/my-ns"; |
| final TopicName dest = TopicName.get( |
| BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker")); |
| admin.topics().createNonPartitionedTopic(dest.toString()); |
| @Cleanup |
| Consumer<byte[]> consumer = pulsarClient.newConsumer() |
| .topic(dest.toString()) |
| .subscriptionName("test-compaction-sub") |
| .subscriptionType(SubscriptionType.Exclusive) |
| .readCompacted(true) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) |
| .subscribe(); |
| @Cleanup |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(dest.toString()) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| producer.send("msg-1".getBytes(StandardCharsets.UTF_8)); |
| Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join(); |
| Assert.assertTrue(topic.isPresent()); |
| PersistentTopic persistentTopic = (PersistentTopic) topic.get(); |
| Random random = new Random(); |
| for (int i = 0; i < 100; i++) { |
| int rad = random.nextInt(3); |
| ByteBuf marker; |
| if (rad == 0) { |
| marker = Markers.newTxnCommitMarker(-1L, 0, i); |
| } else if (rad == 1) { |
| marker = Markers.newTxnAbortMarker(-1L, 0, i); |
| } else { |
| marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1"); |
| } |
| persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() { |
| @Override |
| public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| // |
| } |
| |
| @Override |
| public void addFailed(ManagedLedgerException exception, Object ctx) { |
| // |
| } |
| }, null); |
| marker.release(); |
| } |
| producer.send("msg-2".getBytes(StandardCharsets.UTF_8)); |
| admin.topics().triggerCompaction(dest.toString()); |
| Awaitility.await() |
| .atMost(50, TimeUnit.SECONDS) |
| .pollInterval(1, TimeUnit.SECONDS) |
| .untilAsserted(() -> { |
| long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId; |
| Assert.assertNotEquals(ledgerId, -1L); |
| }); |
| } |
| |
| @Test(timeOut = 100000) |
| public void testReceiverQueueSize() throws Exception { |
| final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID(); |
| final String subName = "my-sub"; |
| final int receiveQueueSize = 1; |
| @Cleanup |
| PulsarClient client = newPulsarClient(lookupUrl.toString(), 100); |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .enableBatching(false).topic(topicName).create(); |
| |
| for (int i = 0; i < 10; i++) { |
| producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync(); |
| } |
| producer.flush(); |
| |
| admin.topics().triggerCompaction(topicName); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertEquals(admin.topics().compactionStatus(topicName).status, |
| LongRunningProcessStatus.Status.SUCCESS); |
| }); |
| |
| ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING) |
| .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) |
| .subscribe(); |
| |
| //Give some time to consume |
| Awaitility.await() |
| .untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), |
| receiveQueueSize)); |
| consumer.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testDispatcherMaxReadSizeBytes() throws Exception { |
| final String topicName = |
| "persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID(); |
| final String subName = "my-sub"; |
| final int receiveQueueSize = 1; |
| @Cleanup |
| PulsarClient client = newPulsarClient(lookupUrl.toString(), 100); |
| Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) |
| .topic(topicName).create(); |
| |
| for (int i = 0; i < 10; i+=2) { |
| producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send(); |
| } |
| producer.flush(); |
| |
| admin.topics().triggerCompaction(topicName); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertEquals(admin.topics().compactionStatus(topicName).status, |
| LongRunningProcessStatus.Status.SUCCESS); |
| }); |
| |
| admin.topics().unload(topicName); |
| |
| ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES) |
| .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) |
| .subscribe(); |
| |
| PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); |
| PersistentSubscription persistentSubscription = topic.getSubscriptions().get(subName); |
| PersistentDispatcherSingleActiveConsumer dispatcher = |
| Mockito.spy((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()); |
| FieldUtils.writeDeclaredField(persistentSubscription, "dispatcher", dispatcher, true); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertSame(consumer.getStats().getMsgNumInReceiverQueue(), 1); |
| }); |
| |
| consumer.increaseAvailablePermits(2); |
| |
| Thread.sleep(2000); |
| |
| Mockito.verify(dispatcher, Mockito.atLeastOnce()) |
| .readEntriesComplete(Mockito.argThat(argument -> argument.size() == 1), |
| Mockito.any(PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.class)); |
| |
| consumer.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testCompactionDuplicate() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate"; |
| final int numMessages = 1000; |
| final int maxKeys = 800; |
| |
| @Cleanup |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // trigger compaction (create __compaction cursor) |
| admin.topics().triggerCompaction(topic); |
| |
| Map<String, byte[]> expected = new HashMap<>(); |
| Random r = new Random(0); |
| |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); |
| |
| for (int j = 0; j < numMessages; j++) { |
| int keyIndex = r.nextInt(maxKeys); |
| String key = "key" + keyIndex; |
| byte[] data = ("my-message-" + key + "-" + j).getBytes(); |
| producer.newMessage().key(key).value(data).send(); |
| expected.put(key, data); |
| } |
| |
| producer.flush(); |
| |
| // trigger compaction |
| admin.topics().triggerCompaction(topic); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertEquals(admin.topics().compactionStatus(topic).status, |
| LongRunningProcessStatus.Status.RUNNING); |
| }); |
| |
| // Wait for phase one to complete |
| Thread.sleep(500); |
| |
| // Unload topic make reader of compaction reconnect |
| admin.topics().unload(topic); |
| |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false); |
| // Compacted topic ledger should have same number of entry equals to number of unique key. |
| Assert.assertEquals(internalStats.compactedLedger.entries, expected.size()); |
| Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); |
| Assert.assertFalse(internalStats.compactedLedger.offloaded); |
| }); |
| |
| // consumer with readCompacted enabled only get compacted entries |
| try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") |
| .readCompacted(true).subscribe()) { |
| while (true) { |
| Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); |
| Assert.assertEquals(expected.remove(m.getKey()), m.getData()); |
| if (expected.isEmpty()) { |
| break; |
| } |
| } |
| } |
| } |
| } |