Limit read-ahead bytes to the size of the read cache (#2647)

* Limit read-ahead bytes to the size of the read cache

* Update bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Co-authored-by: lipenghui <penghui@apache.org>

Co-authored-by: lipenghui <penghui@apache.org>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 0ac898c..94904be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -134,6 +134,8 @@
 
     private static final long DEFAULT_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
 
+    private final long maxReadAheadBytesSize;
+
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
             CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
@@ -155,6 +157,9 @@
         readCacheMaxSize = readCacheSize;
         readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
 
+        // Do not attempt to perform read-ahead more than half the total size of the cache
+        maxReadAheadBytesSize = readCacheMaxSize / 2;
+
         long maxThrottleTimeMillis = conf.getLong(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS,
                 DEFAULT_MAX_THROTTLE_TIME_MILLIS);
         maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
@@ -469,7 +474,9 @@
             int count = 0;
             long size = 0;
 
-            while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
+            while (count < readAheadCacheBatchSize
+                    && size < maxReadAheadBytesSize
+                    && currentEntryLogId == firstEntryLogId) {
                 ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation,
                         false /* validateEntry */);