blob: b1f5de71e5ec774f5966639c6fae1b78b3267029 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
// Disable the scheduled task: compaction.
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
// Disable the scheduled task: retention.
conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
}
private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
.get().get().getLastMessageId().get();
}
private void triggerCompactionAndWait(String topicName) throws Exception {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
persistentTopic.triggerCompaction();
Awaitility.await().untilAsserted(() -> {
PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
PositionImpl markDeletePos = (PositionImpl) persistentTopic
.getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
});
}
private void triggerLedgerSwitch(String topicName) throws Exception{
admin.topics().unload(topicName);
Awaitility.await().until(() -> {
CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopic(topicName, false);
if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
return false;
}
Optional<Topic> topicOptional = topicFuture.join();
if (!topicOptional.isPresent()){
return false;
}
PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
});
}
private void clearAllTheLedgersOutdated(String topicName) throws Exception {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
CompletableFuture<Void> future = new CompletableFuture();
managedLedger.trimConsumedLedgersInBackground(future);
future.join();
return managedLedger.getLedgersInfo().size() == 1;
});
}
@Test
public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
assertEquals(messageId.getLedgerId(), -1);
assertEquals(messageId.getEntryId(), -1);
// cleanup.
consumer.close();
admin.topics().delete(topicName, false);
}
private Producer<String> createProducer(boolean enabledBatch, String topicName) throws Exception {
ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(enabledBatch);
if (enabledBatch){
producerBuilder.batchingMaxBytes(Integer.MAX_VALUE)
.batchingMaxPublishDelay(3, TimeUnit.HOURS)
.batchingMaxBytes(Integer.MAX_VALUE);
}
return producerBuilder.create();
}
private Consumer<String> createConsumer(String topicName, String subName) throws Exception {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subName)
.receiverQueueSize(1)
.readCompacted(true)
.subscribe();
}
@Test
public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING)
.topic(topicName)
.subscriptionName(subName)
.receiverQueueSize(1)
.startMessageId(MessageId.earliest)
.readCompacted(false)
.create();
Producer<String> producer = createProducer(false, topicName);
producer.newMessage().key("k0").value("v0").sendAsync().get();
reader.readNext();
triggerLedgerSwitch(topicName);
clearAllTheLedgersOutdated(topicName);
MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertEquals(messageId.getLedgerId(), -1);
assertEquals(messageId.getEntryId(), -1);
// cleanup.
reader.close();
producer.close();
admin.topics().delete(topicName, false);
}
@DataProvider(name = "enabledBatch")
public Object[][] enabledBatch(){
return new Object[][]{
{true},
{false}
};
}
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
Producer<String> producer = createProducer(enabledBatch, topicName);
List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
producer.flush();
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
producer.flush();
FutureUtil.waitForAll(sendFutures).join();
MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName);
MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
if (enabledBatch){
BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected;
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
}
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName, false);
}
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
Producer<String> producer = createProducer(enabledBatch, topicName);
List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
producer.flush();
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
producer.flush();
FutureUtil.waitForAll(sendFutures).join();
triggerCompactionAndWait(topicName);
MessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(topicName);
MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
assertEquals(messageId.getLedgerId(), lastMessageIdByTopic.getLedgerId());
assertEquals(messageId.getEntryId(), lastMessageIdByTopic.getEntryId());
if (enabledBatch){
BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdByTopic;
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
}
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName, false);
}
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean enabledBatch) throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
Producer<String> producer = createProducer(enabledBatch, topicName);
List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
producer.flush();
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync());
sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
producer.flush();
FutureUtil.waitForAll(sendFutures).join();
triggerCompactionAndWait(topicName);
MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(2).get();
MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
if (enabledBatch){
BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected;
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize());
assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex());
}
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName, false);
}
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdAfterCompactionEndWithNullMsg2(boolean enabledBatch) throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
Producer<String> producer = createProducer(enabledBatch, topicName);
List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
producer.flush();
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
producer.flush();
FutureUtil.waitForAll(sendFutures).join();
triggerCompactionAndWait(topicName);
MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(4).get();
MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
if (enabledBatch){
BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected;
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize());
assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex());
}
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName, false);
}
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdAfterCompactionAllNullMsg(boolean enabledBatch) throws Exception {
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName);
Producer<String> producer = createProducer(enabledBatch, topicName);
List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k0").value(null).sendAsync());
producer.flush();
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync());
sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
producer.flush();
FutureUtil.waitForAll(sendFutures).join();
triggerCompactionAndWait(topicName);
MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
assertFalse(lastMessageId instanceof BatchMessageIdImpl);
assertEquals(lastMessageId.getLedgerId(), -1);
assertEquals(lastMessageId.getEntryId(), -1);
// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topicName, false);
}
}