[fix][broker][branch-2.10] limit the memory used by reads end-to-end
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index b946dc0..f3848b6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 5401ae3..3e7401b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -29,7 +29,6 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -72,7 +71,6 @@
     public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
         this.manager = manager;
         this.ml = ml;
-        this.pendingReadsManager = new PendingReadsManager(this);
         this.interceptor = ml.getManagedLedgerInterceptor();
         this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
         this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -281,14 +279,14 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
             final ReadEntriesCallback callback, Object ctx) {
-        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
+        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null);
     }
 
-    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
         final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
 
         final AsyncCallbacks.ReadEntriesCallback callback =
-                handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
+                handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader,
                         originalCallback, ctx, handle);
         if (callback == null) {
             return;
@@ -371,8 +369,10 @@
     private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
                                                                         long firstEntry, long lastEntry,
                                                                         boolean shouldCacheEntry,
-                                                                        AsyncCallbacks.ReadEntriesCallback originalCallback,
-                                                                        Object ctx, InflightReadsLimiter.Handle handle) {
+                                                                        AsyncCallbacks.ReadEntriesCallback
+                                                                                originalCallback,
+                                                                        Object ctx,
+                                                                        InflightReadsLimiter.Handle handle) {
         InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
         if (pendingReadsLimiter.isDisabled()) {
             return originalCallback;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 2b69581..e134512 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 86db5eb..23f6ca9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -527,12 +527,9 @@
         }
         long size = entries.stream().mapToLong(Entry::getLength).sum();
         updatePendingBytesToDispatch(size);
-        if (sendMessagesToConsumers(readType, entries)) {
-            updatePendingBytesToDispatch(-size);
-            readMoreEntriesAsync();
-        } else {
-            updatePendingBytesToDispatch(-size);
-        }    }
+        sendMessagesToConsumers(readType, entries);
+        updatePendingBytesToDispatch(-size);
+    }
 
     protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         sendInProgress = true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 2c90e89..6f826cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -93,11 +93,8 @@
                 .getNextValidPosition((PositionImpl) entry.getPosition()));
         long size = entry.getLength();
         updatePendingBytesToDispatch(size);
-        if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
-            readMoreEntriesAsync();
-        } else {
-            updatePendingBytesToDispatch(-size);
-        }
+        sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+        updatePendingBytesToDispatch(-size);
         ctx.recycle();
     }