[fix][broker] Fix skip message API when hole messages exists (#20326)
(cherry picked from commit c35b820bb323c8e52bd9cd8ccd29565c23764117)
(cherry picked from commit 967e1e10f1d7a06c862d6a240698685809403d31)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2d2597e..3196a6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1585,7 +1585,6 @@
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
- ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
}
}, recyclePositionRangeConverter);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ca88b31..b979df7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,11 +53,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import lombok.Builder;
import lombok.Cleanup;
+import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
@@ -87,6 +89,7 @@
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -896,6 +899,52 @@
assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
}
+ @Test(dataProvider = "topicName")
+ public void testSkipHoleMessages(String topicName) throws Exception {
+ final String subName = topicName;
+ assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
+
+ final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
+ // Force to create a topic
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
+ assertEquals(admin.topics().getList("prop-xyz/ns1"),
+ Collections.singletonList("persistent://prop-xyz/ns1/" + topicName));
+
+ // create consumer and subscription
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ AtomicInteger total = new AtomicInteger();
+ Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
+ .messageListener(new MessageListener<byte[]>() {
+ @SneakyThrows
+ @Override
+ public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
+ if (total.get() %2 !=0){
+ // artificially created 50 hollow messages
+ consumer.acknowledge(msg);
+ }
+ total.incrementAndGet();
+ }
+ })
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+ assertEquals(admin.topics().getSubscriptions(persistentTopicName), Collections.singletonList(subName));
+
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
+ TimeUnit.SECONDS.sleep(2);
+ TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+ long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
+ log.info("back={}",msgBacklog);
+ int skipNumber = 20;
+ admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
+ topicStats = admin.topics().getStats(persistentTopicName);
+ assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
+ }
+
@Test(dataProvider = "topicNamesForAllTypes")
public void partitionedTopics(String topicType, String topicName) throws Exception {
final String namespace = "prop-xyz/ns1";