blob: b6117aab95e08182a74b7fb6d294fb89526539c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.awaitility.Awaitility;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
public class TopicDuplicationTest extends ProducerConsumerBase {
private final String testTenant = "my-property";
private final String testNamespace = "my-ns";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";
@BeforeMethod
@Override
protected void setup() throws Exception {
resetConfig();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setBrokerDeduplicationEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test(timeOut = 10000)
public void testDuplicationApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Boolean enabled = admin.topics().getDeduplicationEnabled(topicName);
assertNull(enabled);
admin.topics().enableDeduplication(topicName, true);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(()-> admin.topics().getDeduplicationEnabled(topicName) != null);
Assert.assertEquals(admin.topics().getDeduplicationEnabled(topicName), true);
admin.topics().disableDeduplication(topicName);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(()-> admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
assertNull(admin.topics().getDeduplicationEnabled(topicName));
}
@Test(timeOut = 10000)
public void testTopicDuplicationApi2() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Boolean enabled = admin.topics().getDeduplicationStatus(topicName);
assertNull(enabled);
admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> admin.topics().getDeduplicationStatus(topicName) != null);
Assert.assertEquals(admin.topics().getDeduplicationStatus(topicName), true);
admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
assertNull(admin.topics().getDeduplicationStatus(topicName));
}
@Test(timeOut = 10000)
public void testTopicDuplicationAppliedApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
waitCacheInit(topicName);
assertNull(admin.namespaces().getDeduplicationStatus(myNamespace));
assertNull(admin.topics().getDeduplicationStatus(topicName));
assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(),
conf.isBrokerDeduplicationEnabled());
admin.namespaces().setDeduplicationStatus(myNamespace, false);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getDeduplicationStatus(topicName, true)));
admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
admin.namespaces().removeDeduplicationStatus(myNamespace);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(),
conf.isBrokerDeduplicationEnabled()));
}
@Test(timeOut = 30000)
public void testDeduplicationPriority() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
final int maxMsgNum = 5;
waitCacheInit(topicName);
//1) Start up producer and send msg.We specified the max sequenceId
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
//broker-level deduplication is enabled in setup() by default
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
//disabled in namespace-level
admin.namespaces().setDeduplicationStatus(myNamespace, false);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationDisabled(producerName, messageDeduplication);
//enabled in topic-level
admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getDeduplicationStatus(topicName)));
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
long maxSeq2 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq2);
//remove topic-level, use namespace-level
admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getDeduplicationStatus(topicName)));
Awaitility.await().untilAsserted(() -> assertFalse(messageDeduplication.isEnabled()));
producer.newMessage().value("msg").sequenceId(1).send();
checkDeduplicationDisabled(producerName, messageDeduplication);
//remove namespace-level , use broker-level
admin.namespaces().removeDeduplicationStatus(myNamespace);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
long maxSeq3 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq3);
}
private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer) throws Exception{
long seq = System.nanoTime();
for (int i = 0; i <= maxMsgNum; i++) {
producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
}
return seq + maxMsgNum;
}
private void checkDeduplicationDisabled(String producerName, MessageDeduplication messageDeduplication) throws Exception {
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
}).get();
}
private void checkDeduplicationEnabled(String producerName, MessageDeduplication messageDeduplication,
long maxSeq) throws Exception {
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertNotNull(messageDeduplication.highestSequencedPersisted);
assertNotNull(messageDeduplication.highestSequencedPushed);
long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
assertEquals(seqId, maxSeq);
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
}).get();
}
@Test(timeOut = 10000)
public void testDuplicationSnapshotApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer interval = admin.topics().getDeduplicationSnapshotInterval(topicName);
assertNull(interval);
admin.topics().setDeduplicationSnapshotInterval(topicName, 1024);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(()-> admin.topics().getDeduplicationSnapshotInterval(topicName) != null);
Assert.assertEquals(admin.topics().getDeduplicationSnapshotInterval(topicName).intValue(), 1024);
admin.topics().removeDeduplicationSnapshotInterval(topicName);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(()-> admin.topics().getDeduplicationSnapshotInterval(topicName) == null);
assertNull(admin.topics().getDeduplicationSnapshotInterval(topicName));
}
@Test(timeOut = 30000)
public void testTopicPolicyTakeSnapshot() throws Exception {
super.internalCleanup();
resetConfig();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalSetup();
super.producerBaseSetup();
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
waitCacheInit(topicName);
admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5);
int msgNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
}
countDownLatch.await();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
long seqId = persistentTopic.getMessageDeduplication().highestSequencedPersisted.get(producerName);
PositionImpl position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
//The first time, use topic-leve policies, 1 second delay + 3 second interval
Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum - 1);
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
assertEquals(position, markDeletedPosition);
//remove topic-level policies, namespace-level should be used, interval becomes 5 seconds
admin.topics().removeDeduplicationSnapshotInterval(topicName);
producer.newMessage().value("msg").send();
//zk update time + 5 second interval time
Awaitility.await().atMost( 7, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(msgNum, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);
//4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds
admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
Awaitility.await().atMost(4, TimeUnit.SECONDS)
.until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null));
producer.newMessage().value("msg").send();
//ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated
Thread.sleep(3000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
// broker-level interval is 7 seconds, so 3 seconds will not take a snapshot
assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId());
assertNotEquals(position, markDeletedPosition);
// wait for scheduler
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum + 1);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(msgNum + 1, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);
}
@Test(timeOut = 20000)
public void testDuplicationMethod() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
final int maxMsgNum = 100;
admin.topics().createPartitionedTopic(testTopic, 3);
waitCacheInit(topicName);
//1) Start up producer and send msg.We specified the max sequenceId
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
//2) Max sequenceId should be recorded correctly
CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopics().get(topicName);
Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
PersistentTopic persistentTopic = (PersistentTopic) topic;
MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
//3) disable the deduplication check
admin.topics().enableDeduplication(topicName, false);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> admin.topics().getDeduplicationEnabled(topicName) != null);
for (int i = 0; i < 100; i++) {
producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send();
}
//4) Max sequenceId record should be clear
checkDeduplicationDisabled(producerName, messageDeduplication);
}
@Test(timeOut = 40000)
public void testDuplicationSnapshot() throws Exception {
testTakeSnapshot(true);
testTakeSnapshot(false);
}
private void testTakeSnapshot(boolean enabledSnapshot) throws Exception {
super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(enabledSnapshot ? 1 : 0);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalSetup();
super.producerBaseSetup();
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
int msgNum = 50;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
}
countDownLatch.await();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
long seqId = persistentTopic.getMessageDeduplication().highestSequencedPersisted.get(producerName);
PositionImpl position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
Thread.sleep(2000);
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
if (enabledSnapshot) {
assertEquals(position, markDeletedPosition);
} else {
assertNotEquals(position, markDeletedPosition);
assertNotEquals(markDeletedPosition.getEntryId(), -1);
}
producer.newMessage().value("msg").send();
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertNotEquals(msgNum, markDeletedPosition.getEntryId());
assertNotNull(position);
Thread.sleep(2000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
if (enabledSnapshot) {
assertEquals(msgNum, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);
} else {
assertNotEquals(msgNum, markDeletedPosition.getEntryId());
assertNotEquals(position, markDeletedPosition);
}
}
@Test(timeOut = 30000)
public void testNamespacePolicyApi() throws Exception {
Integer interval = admin.namespaces().getDeduplicationSnapshotInterval(myNamespace);
assertNull(interval);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 100);
interval = admin.namespaces().getDeduplicationSnapshotInterval(myNamespace);
assertEquals(interval.intValue(), 100);
admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
interval = admin.namespaces().getDeduplicationSnapshotInterval(myNamespace);
assertNull(interval);
admin.namespaces().setDeduplicationSnapshotIntervalAsync(myNamespace, 200).get();
interval = admin.namespaces().getDeduplicationSnapshotIntervalAsync(myNamespace).get();
assertEquals(interval.intValue(), 200);
admin.namespaces().removeDeduplicationSnapshotIntervalAsync(myNamespace).get();
interval = admin.namespaces().getDeduplicationSnapshotIntervalAsync(myNamespace).get();
assertNull(interval);
}
@Test(timeOut = 30000)
public void testNamespacePolicyTakeSnapshot() throws Exception {
super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(3);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalSetup();
super.producerBaseSetup();
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 1);
int msgNum = 50;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
}
countDownLatch.await();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
long seqId = persistentTopic.getMessageDeduplication().highestSequencedPersisted.get(producerName);
PositionImpl position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
//The first time, 1 second delay + 1 second interval
Awaitility.await().atMost(2500,TimeUnit.MILLISECONDS).until(()-> ((PositionImpl) persistentTopic
.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == msgNum -1);
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
assertEquals(position, markDeletedPosition);
//remove namespace-level policies, broker-level should be used
admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
Thread.sleep(2000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertNotEquals(msgNum - 1, markDeletedPosition.getEntryId());
assertNotEquals(position, markDeletedPosition.getEntryId());
//3 seconds total
Thread.sleep(1000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(msgNum - 1, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);
}
@Test(timeOut = 30000)
public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalSetup();
super.producerBaseSetup();
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
//set value to 0
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 0);
int msgNum = 50;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
}
countDownLatch.await();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
long seqId = persistentTopic.getMessageDeduplication().highestSequencedPersisted.get(producerName);
PositionImpl position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
Awaitility.await().atMost(2,TimeUnit.SECONDS).until(()-> ((PositionImpl) persistentTopic
.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == -1);
// take snapshot is disabled, so markDeletedPosition should not change
assertEquals(markDeletedPosition, managedCursor.getMarkDeletedPosition());
assertEquals(markDeletedPosition.getEntryId(), -1);
assertNotEquals(position, markDeletedPosition);
}
private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
}
}