HBASE-28840: Optimise memory utilization bucketcache retrieval from persistence. (#6253)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 8c1a687..c236886 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -1590,8 +1590,7 @@
     }
   }
 
-  private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
-    List<BucketCacheProtos.BackingMap> chunks) throws IOException {
+  private void parseFirstChunk(BucketCacheProtos.BucketCacheEntry firstChunk) throws IOException {
     fullyCachedFiles.clear();
     Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
       BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(),
@@ -1599,22 +1598,14 @@
     backingMap.putAll(pair.getFirst());
     blocksByHFile.addAll(pair.getSecond());
     fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));
+  }
 
-    LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", backingMap.size(),
-      fullyCachedFiles.size());
-    int i = 1;
-    for (BucketCacheProtos.BackingMap chunk : chunks) {
-      Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
-        BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, this::createRecycler);
-      backingMap.putAll(pair2.getFirst());
-      blocksByHFile.addAll(pair2.getSecond());
-      LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", ++i,
-        backingMap.size(), fullyCachedFiles.size());
-    }
-    verifyFileIntegrity(firstChunk);
-    verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(),
-      firstChunk.getMapClass());
-    updateRegionSizeMapWhileRetrievingFromFile();
+  private void parseChunkPB(BucketCacheProtos.BackingMap chunk,
+    java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException {
+    Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
+      BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler);
+    backingMap.putAll(pair2.getFirst());
+    blocksByHFile.addAll(pair2.getSecond());
   }
 
   private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
@@ -1669,18 +1660,22 @@
 
     LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);
 
-    ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new ArrayList<>();
     // Read the first chunk that has all the details.
     BucketCacheProtos.BucketCacheEntry firstChunk =
       BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);
+    parseFirstChunk(firstChunk);
 
     // Subsequent chunks have the backingMap entries.
     for (int i = 1; i < numChunks; i++) {
       LOG.info("Reading chunk no: {}", i + 1);
-      bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
+      parseChunkPB(BucketCacheProtos.BackingMap.parseDelimitedFrom(in),
+        firstChunk.getDeserializersMap());
       LOG.info("Retrieved chunk: {}", i + 1);
     }
-    parsePB(firstChunk, bucketCacheMaps);
+    verifyFileIntegrity(firstChunk);
+    verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(),
+      firstChunk.getMapClass());
+    updateRegionSizeMapWhileRetrievingFromFile();
   }
 
   /**