[fix][broker] Fix skip message API when hole messages exists (#20326)

(cherry picked from commit c35b820bb323c8e52bd9cd8ccd29565c23764117)
(cherry picked from commit 967e1e10f1d7a06c862d6a240698685809403d31)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2d2597e..3196a6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1585,7 +1585,6 @@
                 } finally {
                     if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
                         ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
-                        ((PositionImplRecyclable) r.upperEndpoint()).recycle();
                     }
                 }
             }, recyclePositionRangeConverter);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ca88b31..b979df7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,11 +53,13 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response.Status;
 import lombok.Builder;
 import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
@@ -87,6 +89,7 @@
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -896,6 +899,52 @@
         assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
     }
 
+    @Test(dataProvider = "topicName")
+    public void testSkipHoleMessages(String topicName) throws Exception {
+        final String subName = topicName;
+        assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
+
+        final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
+        // Force to create a topic
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                Collections.singletonList("persistent://prop-xyz/ns1/" + topicName));
+
+        // create consumer and subscription
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        AtomicInteger total = new AtomicInteger();
+        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
+                .messageListener(new MessageListener<byte[]>() {
+                    @SneakyThrows
+                    @Override
+                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
+                        if (total.get() %2 !=0){
+                            // artificially created 50 hollow messages
+                            consumer.acknowledge(msg);
+                        }
+                        total.incrementAndGet();
+                    }
+                })
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), Collections.singletonList(subName));
+
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
+        TimeUnit.SECONDS.sleep(2);
+        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+        long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
+        log.info("back={}",msgBacklog);
+        int skipNumber = 20;
+        admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
+        topicStats = admin.topics().getStats(persistentTopicName);
+        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
+    }
+
     @Test(dataProvider = "topicNamesForAllTypes")
     public void partitionedTopics(String topicType, String topicName) throws Exception {
         final String namespace = "prop-xyz/ns1";