| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.compaction; |
| |
| import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; |
| import com.github.benmanes.caffeine.cache.AsyncLoadingCache; |
| import com.google.common.collect.Sets; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.IntStream; |
| |
| import lombok.Cleanup; |
| |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.commons.lang3.tuple.Triple; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.client.admin.LongRunningProcessStatus; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProducerBuilder; |
| import org.apache.pulsar.client.api.RawMessage; |
| import org.apache.pulsar.client.api.Reader; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.impl.RawMessageImpl; |
| import org.apache.pulsar.client.impl.ReaderImpl; |
| import org.apache.pulsar.common.api.proto.MessageIdData; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.awaitility.Awaitility; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "broker-compaction") |
| public class CompactedTopicTest extends MockedPulsarServiceBaseTest { |
| private final Random r = new Random(0); |
| |
| @DataProvider(name = "batchEnabledProvider") |
| public Object[][] batchEnabledProvider() { |
| return new Object[][] { |
| { Boolean.FALSE }, |
| { Boolean.TRUE } |
| }; |
| } |
| |
| @BeforeMethod |
| @Override |
| public void setup() throws Exception { |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("use", |
| ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); |
| admin.tenants().createTenant("my-property", |
| new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); |
| admin.namespaces().createNamespace("my-property/use/my-ns"); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| public void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| /** |
| * Build a compacted ledger, and return the id of the ledger, the position of the different |
| * entries in the ledger, and a list of gaps, and the entry which should be returned after the gap. |
| */ |
| private Triple<Long, List<Pair<MessageIdData,Long>>, List<Pair<MessageIdData,Long>>> |
| buildCompactedLedger(BookKeeper bk, int count) |
| throws Exception { |
| LedgerHandle lh = bk.createLedger(1, 1, |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); |
| List<Pair<MessageIdData,Long>> positions = new ArrayList<>(); |
| List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>(); |
| |
| AtomicLong ledgerIds = new AtomicLong(10L); |
| AtomicLong entryIds = new AtomicLong(0L); |
| CompletableFuture.allOf( |
| IntStream.range(0, count).mapToObj((i) -> { |
| List<MessageIdData> idsInGap = new ArrayList<>(); |
| if (r.nextInt(10) == 1) { |
| long delta = r.nextInt(10) + 1; |
| idsInGap.add(new MessageIdData() |
| .setLedgerId(ledgerIds.get()) |
| .setEntryId(entryIds.get() + 1)); |
| ledgerIds.addAndGet(delta); |
| entryIds.set(0); |
| } |
| long delta = r.nextInt(5); |
| if (delta != 0) { |
| idsInGap.add(new MessageIdData() |
| .setLedgerId(ledgerIds.get()) |
| .setEntryId(entryIds.get() + 1)); |
| } |
| MessageIdData id = new MessageIdData() |
| .setLedgerId(ledgerIds.get()) |
| .setEntryId(entryIds.addAndGet(delta + 1)); |
| |
| @Cleanup |
| RawMessage m = new RawMessageImpl(id, Unpooled.EMPTY_BUFFER); |
| |
| CompletableFuture<Void> f = new CompletableFuture<>(); |
| ByteBuf buffer = m.serialize(); |
| |
| lh.asyncAddEntry(buffer, |
| (rc, ledger, eid, ctx) -> { |
| if (rc != BKException.Code.OK) { |
| f.completeExceptionally(BKException.create(rc)); |
| } else { |
| positions.add(Pair.of(id, eid)); |
| idsInGap.forEach((gid) -> idsInGaps.add(Pair.of(gid, eid))); |
| f.complete(null); |
| } |
| }, null); |
| return f; |
| }).toArray(CompletableFuture[]::new)).get(); |
| lh.close(); |
| |
| return Triple.of(lh.getId(), positions, idsInGaps); |
| } |
| |
| @Test |
| public void testEntryLookup() throws Exception { |
| BookKeeper bk = pulsar.getBookKeeperClientFactory().create( |
| this.conf, null, null, Optional.empty(), null); |
| |
| Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData |
| = buildCompactedLedger(bk, 500); |
| |
| List<Pair<MessageIdData, Long>> positions = compactedLedgerData.getMiddle(); |
| List<Pair<MessageIdData, Long>> idsInGaps = compactedLedgerData.getRight(); |
| |
| LedgerHandle lh = bk.openLedger(compactedLedgerData.getLeft(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); |
| long lastEntryId = lh.getLastAddConfirmed(); |
| AsyncLoadingCache<Long,MessageIdData> cache = CompactedTopicImpl.createCache(lh, 50); |
| |
| MessageIdData firstPositionId = positions.get(0).getLeft(); |
| Pair<MessageIdData, Long> lastPosition = positions.get(positions.size() - 1); |
| |
| // check ids before and after ids in compacted ledger |
| Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(0, 0), lastEntryId, cache).get(), |
| Long.valueOf(0)); |
| Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(Long.MAX_VALUE, 0), |
| lastEntryId, cache).get(), |
| Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED)); |
| |
| // entry 0 is never in compacted ledger due to how we generate dummy |
| Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(firstPositionId.getLedgerId(), 0), |
| lastEntryId, cache).get(), |
| Long.valueOf(0)); |
| // check next id after last id in compacted ledger |
| Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(lastPosition.getLeft().getLedgerId(), |
| lastPosition.getLeft().getEntryId() + 1), |
| lastEntryId, cache).get(), |
| Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED)); |
| |
| // shuffle to make cache work hard |
| Collections.shuffle(positions, r); |
| Collections.shuffle(idsInGaps, r); |
| |
| // Check ids we know are in compacted ledger |
| for (Pair<MessageIdData, Long> p : positions) { |
| PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), p.getLeft().getEntryId()); |
| Long got = CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(); |
| Assert.assertEquals(got, p.getRight()); |
| } |
| |
| // Check ids we know are in the gaps of the compacted ledger |
| for (Pair<MessageIdData, Long> gap : idsInGaps) { |
| PositionImpl pos = new PositionImpl(gap.getLeft().getLedgerId(), gap.getLeft().getEntryId()); |
| Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(), gap.getRight()); |
| } |
| } |
| |
| @Test |
| public void testCleanupOldCompactedTopicLedger() throws Exception { |
| BookKeeper bk = pulsar.getBookKeeperClientFactory().create( |
| this.conf, null, null, Optional.empty(), null); |
| |
| LedgerHandle oldCompactedLedger = bk.createLedger(1, 1, |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); |
| oldCompactedLedger.close(); |
| LedgerHandle newCompactedLedger = bk.createLedger(1, 1, |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); |
| newCompactedLedger.close(); |
| |
| // set the compacted topic ledger |
| CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk); |
| compactedTopic.newCompactedLedger(new PositionImpl(1,2), oldCompactedLedger.getId()).get(); |
| |
| // ensure both ledgers still exist, can be opened |
| bk.openLedger(oldCompactedLedger.getId(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close(); |
| bk.openLedger(newCompactedLedger.getId(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close(); |
| |
| // update the compacted topic ledger |
| PositionImpl newHorizon = new PositionImpl(1,3); |
| compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId()).get(); |
| |
| // Make sure the old compacted ledger still exist after the new compacted ledger created. |
| bk.openLedger(oldCompactedLedger.getId(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close(); |
| |
| Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent()); |
| Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(), |
| newCompactedLedger.getId()); |
| Assert.assertTrue(compactedTopic.getCompactionHorizon().isPresent()); |
| Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), newHorizon); |
| compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join(); |
| |
| // old ledger should be deleted, new still there |
| try { |
| bk.openLedger(oldCompactedLedger.getId(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close(); |
| Assert.fail("Should have failed to open old ledger"); |
| } catch (BKException.BKNoSuchLedgerExistsException |
| | BKException.BKNoSuchLedgerExistsOnMetadataServerException e) { |
| // correct, expected behaviour |
| } |
| bk.openLedger(newCompactedLedger.getId(), |
| Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, |
| Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close(); |
| } |
| |
| @Test(dataProvider = "batchEnabledProvider") |
| public void testCompactWithEmptyMessage(boolean batchEnabled) throws Exception { |
| final String key = "1"; |
| byte[] msgBytes = "".getBytes(); |
| final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID(); |
| admin.topics().createPartitionedTopic(topic, 1); |
| final int messages = 10; |
| |
| ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic); |
| if (!batchEnabled) { |
| builder.enableBatching(false); |
| } else { |
| builder.batchingMaxMessages(messages / 2); |
| } |
| Producer<byte[]> producer = builder.create(); |
| |
| List<CompletableFuture<MessageId>> list = new ArrayList<>(messages); |
| for (int i = 0; i < messages; i++) { |
| list.add(producer.newMessage().keyBytes(key.getBytes(Charset.defaultCharset())).value(msgBytes).sendAsync()); |
| } |
| |
| FutureUtil.waitForAll(list).get(); |
| admin.topics().triggerCompaction(topic); |
| |
| boolean succeed = retryStrategically((test) -> { |
| try { |
| return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 10, 200); |
| |
| Assert.assertTrue(succeed); |
| |
| // Send more messages and trigger compaction again. |
| |
| list.clear(); |
| for (int i = 0; i < messages; i++) { |
| list.add(producer.newMessage().key(key).value(msgBytes).sendAsync()); |
| } |
| |
| FutureUtil.waitForAll(list).get(); |
| admin.topics().triggerCompaction(topic); |
| |
| succeed = retryStrategically((test) -> { |
| try { |
| return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 10, 200); |
| Assert.assertTrue(succeed); |
| |
| producer.close(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testReadMessageFromCompactedLedger() throws Exception { |
| final String key = "1"; |
| String msg = "test compaction msg"; |
| final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID(); |
| admin.topics().createPartitionedTopic(topic, 1); |
| final int numMessages = 10; |
| |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); |
| for (int i = 0; i < numMessages; ++i) { |
| producer.newMessage().key(key).value(msg).send(); |
| } |
| |
| admin.topics().triggerCompaction(topic); |
| boolean succeed = retryStrategically((test) -> { |
| try { |
| return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 10, 200); |
| |
| Assert.assertTrue(succeed); |
| |
| |
| final String newKey = "2"; |
| String newMsg = "test compaction msg v2"; |
| for (int i = 0; i < numMessages; ++i) { |
| producer.newMessage().key(newKey).value(newMsg).send(); |
| } |
| |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .subscriptionName("test") |
| .readCompacted(true) |
| .startMessageId(MessageId.earliest) |
| .create(); |
| |
| int compactedMsgCount = 0; |
| int nonCompactedMsgCount = 0; |
| while (reader.hasMessageAvailable()) { |
| Message<String> message = reader.readNext(); |
| if (key.equals(message.getKey()) && msg.equals(message.getValue())) { |
| compactedMsgCount++; |
| } else if (newKey.equals(message.getKey()) && newMsg.equals(message.getValue())) { |
| nonCompactedMsgCount++; |
| } |
| } |
| |
| Assert.assertEquals(compactedMsgCount, 1); |
| Assert.assertEquals(nonCompactedMsgCount, numMessages); |
| } |
| |
| @Test |
| public void testLastMessageIdForCompactedLedger() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + UUID.randomUUID(); |
| final String key = "1"; |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); |
| final int numMessages = 10; |
| final String msg = "test compaction msg"; |
| for (int i = 0; i < numMessages; ++i) { |
| producer.newMessage().key(key).value(msg).send(); |
| } |
| admin.topics().triggerCompaction(topic); |
| boolean succeed = retryStrategically((test) -> { |
| try { |
| return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 10, 200); |
| |
| Assert.assertTrue(succeed); |
| |
| PersistentTopicInternalStats stats0 = admin.topics().getInternalStats(topic); |
| admin.topics().unload(topic); |
| PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic); |
| // Make sure the ledger rollover has triggered. |
| Assert.assertTrue(stats0.currentLedgerSize != stats1.currentLedgerSize); |
| |
| Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get(); |
| Assert.assertTrue(topicRef.isPresent()); |
| PersistentTopic persistentTopic = (PersistentTopic) topicRef.get(); |
| ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger(); |
| managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); |
| |
| Awaitility.await().untilAsserted(() -> { |
| Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0); |
| Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1); |
| Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1); |
| }); |
| |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .subscriptionName("test") |
| .readCompacted(true) |
| .startMessageId(MessageId.earliest) |
| .create(); |
| |
| Assert.assertTrue(reader.hasMessageAvailable()); |
| Message<String> received = reader.readNext(); |
| Assert.assertEquals(msg, received.getValue()); |
| MessageId messageId = ((ReaderImpl<String>) reader).getConsumer().getLastMessageId(); |
| Assert.assertEquals(messageId, received.getMessageId()); |
| Assert.assertFalse(reader.hasMessageAvailable()); |
| reader.close(); |
| |
| // Unload the topic again to simulate entry ID with -1 after all data has been compacted. |
| admin.topics().unload(topic); |
| PersistentTopicInternalStats stats2 = admin.topics().getInternalStats(topic); |
| Assert.assertTrue(stats2.lastConfirmedEntry.endsWith(":-1")); |
| Assert.assertTrue(stats2.compactedLedger.ledgerId > 0); |
| |
| reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .subscriptionName("test") |
| .readCompacted(true) |
| .startMessageId(MessageId.earliest) |
| .create(); |
| Assert.assertTrue(reader.hasMessageAvailable()); |
| reader.readNext(); |
| Assert.assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| @Test |
| public void testDoNotLossTheLastCompactedLedgerData() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" + |
| UUID.randomUUID(); |
| final int numMessages = 2000; |
| final int keys = 200; |
| final String msg = "Test"; |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .blockIfQueueFull(true) |
| .maxPendingMessages(numMessages) |
| .enableBatching(false) |
| .create(); |
| CompletableFuture<MessageId> lastMessage = null; |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, keys); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| admin.topics().unload(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertEquals(stats.ledgers.size(), 1); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| admin.topics().unload(topic); |
| // Send one more key to and then to trigger the compaction |
| producer.newMessage().key(keys + "").value(msg).send(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertEquals(stats.compactedLedger.entries, keys + 1); |
| }); |
| |
| // Make sure the reader can get all data from the compacted ledger and original ledger. |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .create(); |
| int received = 0; |
| while (reader.hasMessageAvailable()) { |
| reader.readNext(); |
| received++; |
| } |
| Assert.assertEquals(received, keys + 1); |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test |
| public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" + |
| UUID.randomUUID(); |
| final int numMessages = 2000; |
| final int keys = 200; |
| final String msg = "Test"; |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .blockIfQueueFull(true) |
| .maxPendingMessages(numMessages) |
| .enableBatching(false) |
| .create(); |
| CompletableFuture<MessageId> lastMessage = null; |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, keys); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| // Send more 200 keys |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| |
| // Make sure we have more than 1 original ledgers |
| admin.topics().unload(topic); |
| Awaitility.await().untilAsserted(() -> { |
| Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2); |
| }); |
| |
| // Start a new reader to reading messages |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .receiverQueueSize(10) |
| .create(); |
| |
| // Send more 200 keys |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key((i % keys + keys * 2) + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, keys * 3); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| |
| // The reader should read all 600 keys |
| int received = 0; |
| while (reader.hasMessageAvailable()) { |
| reader.readNext(); |
| received++; |
| } |
| Assert.assertEquals(received, keys * 3); |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test(timeOut = 120000) |
| public void testCompactionWithTopicUnloading() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" + |
| UUID.randomUUID(); |
| final int numMessages = 2000; |
| final int keys = 500; |
| final String msg = "Test"; |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .blockIfQueueFull(true) |
| .maxPendingMessages(numMessages) |
| .enableBatching(false) |
| .create(); |
| CompletableFuture<MessageId> lastMessage = null; |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().pollInterval(5, TimeUnit.SECONDS).untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, keys); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| |
| admin.topics().unload(topic); |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Thread.sleep(100); |
| admin.topics().unload(topic); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().pollInterval(3, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, keys * 2); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| }); |
| |
| // Start a new reader to reading messages |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .receiverQueueSize(10) |
| .create(); |
| |
| // The reader should read all 600 keys |
| int received = 0; |
| while (reader.hasMessageAvailable()) { |
| reader.readNext(); |
| received++; |
| } |
| Assert.assertEquals(received, keys * 2); |
| reader.close(); |
| producer.close(); |
| } |
| |
| @Test(timeOut = 1000 * 30) |
| public void testReader() throws Exception { |
| final String ns = "my-property/use/my-ns"; |
| String topic = "persistent://" + ns + "/t1"; |
| |
| @Cleanup |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topic) |
| .create(); |
| |
| producer.newMessage().key("k").value(("value").getBytes()).send(); |
| producer.newMessage().key("k").value(null).send(); |
| pulsar.getCompactor().compact(topic).get(); |
| |
| Awaitility.await() |
| .pollInterval(3, TimeUnit.SECONDS) |
| .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { |
| admin.topics().unload(topic); |
| Thread.sleep(100); |
| Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1")); |
| }); |
| // Make sure the last confirm entry is -1, then get last message id from compact ledger |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic); |
| Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1")); |
| // Because the latest value of the key `k` is null, so there is no data in compact ledger. |
| Assert.assertEquals(internalStats.compactedLedger.size, 0); |
| |
| @Cleanup |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic(topic) |
| .startMessageIdInclusive() |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .create(); |
| Assert.assertFalse(reader.hasMessageAvailable()); |
| } |
| |
| @Test |
| public void testHasMessageAvailableWithNullValueMessage() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testHasMessageAvailable-" + |
| UUID.randomUUID(); |
| final int numMessages = 10; |
| @Cleanup |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .blockIfQueueFull(true) |
| .enableBatching(false) |
| .create(); |
| CompletableFuture<MessageId> lastMessage = null; |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync(); |
| } |
| |
| for (int i = numMessages / 2; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i + "").value(null).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, numMessages / 2); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); |
| }); |
| |
| @Cleanup |
| Reader<byte[]> reader = pulsarClient.newReader() |
| .topic(topic) |
| .startMessageIdInclusive() |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .create(); |
| for (int i = numMessages / 2; i < numMessages; ++i) { |
| reader.readNext(); |
| } |
| Assert.assertFalse(reader.hasMessageAvailable()); |
| Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); |
| } |
| |
| public void testReadCompleteMessagesDuringTopicUnloading() throws Exception { |
| String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" + |
| UUID.randomUUID(); |
| final int numMessages = 1000; |
| @Cleanup |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topic) |
| .blockIfQueueFull(true) |
| .enableBatching(false) |
| .create(); |
| CompletableFuture<MessageId> lastMessage = null; |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| admin.topics().triggerCompaction(topic); |
| Awaitility.await().untilAsserted(() -> { |
| PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); |
| Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); |
| Assert.assertEquals(stats.compactedLedger.entries, numMessages); |
| Assert.assertEquals(admin.topics().getStats(topic) |
| .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); |
| Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); |
| }); |
| // Unload the topic to make sure the original ledger been deleted. |
| admin.topics().unload(topic); |
| // Produce more messages to the original topic |
| for (int i = 0; i < numMessages; ++i) { |
| lastMessage = producer.newMessage().key(i + numMessages + "").value(String.format("msg [%d]", i + numMessages)).sendAsync(); |
| } |
| producer.flush(); |
| lastMessage.join(); |
| // For now the topic has 1000 messages in the compacted ledger and 1000 messages in the original topic. |
| @Cleanup |
| Reader<String> reader = pulsarClient.newReader(Schema.STRING) |
| .topic(topic) |
| .startMessageIdInclusive() |
| .startMessageId(MessageId.earliest) |
| .readCompacted(true) |
| .create(); |
| |
| // Unloading the topic during reading the data to make sure the reader will not miss any messages. |
| for (int i = 0; i < numMessages / 2; ++i) { |
| Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i)); |
| } |
| admin.topics().unload(topic); |
| for (int i = 0; i < numMessages / 2; ++i) { |
| Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages / 2)); |
| } |
| admin.topics().unload(topic); |
| for (int i = 0; i < numMessages; ++i) { |
| Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages)); |
| } |
| } |
| } |