HBASE-28467: Add time-based priority caching checks for cacheOnRead code paths. (#5905)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 7fb1f1e..40dc0ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -275,6 +275,18 @@
       || (prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN));
   }
 
+  public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo,
+    Configuration conf) {
+    Optional<Boolean> cacheFileBlock = Optional.of(true);
+    if (getBlockCache().isPresent()) {
+      Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf);
+      if (result.isPresent()) {
+        cacheFileBlock = result;
+      }
+    }
+    return shouldCacheBlockOnRead(category) && cacheFileBlock.get();
+  }
+
   /** Returns true if blocks in this file should be flagged as in-memory */
   public boolean isInMemory() {
     return this.inMemory;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index e0585c6..989af7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1193,7 +1193,8 @@
       BlockCacheKey cacheKey =
         new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);
 
-      cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
+      cacheBlock &=
+        cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory(), getHFileInfo(), conf);
       HFileBlock cachedBlock =
         getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
       if (cachedBlock != null) {
@@ -1346,14 +1347,15 @@
         }
         BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
         final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
-        final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
+        final boolean cacheOnRead =
+          cacheConf.shouldCacheBlockOnRead(category, getHFileInfo(), conf);
 
         // Don't need the unpacked block back and we're storing the block in the cache compressed
         if (cacheOnly && cacheCompressed && cacheOnRead) {
           cacheConf.getBlockCache().ifPresent(cache -> {
             LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
             // Cache the block if necessary
-            if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
+            if (cacheable && cacheOnRead) {
               cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
             }
           });
@@ -1366,7 +1368,7 @@
         HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
         // Cache the block if necessary
         cacheConf.getBlockCache().ifPresent(cache -> {
-          if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
+          if (cacheable && cacheOnRead) {
             // Using the wait on cache during compaction and prefetching.
             cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
               cacheConf.isInMemory(), cacheOnly);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
index fbd88a6..78e03cc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
@@ -21,6 +21,7 @@
 import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -50,12 +51,15 @@
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -560,6 +564,66 @@
     }
   }
 
+  @Test
+  public void testCacheConfigShouldCacheFile() throws Exception {
+    // Evict the files from cache.
+    for (HStoreFile file : hStoreFiles) {
+      file.closeStoreFile(true);
+    }
+    // Verify that the API shouldCacheFileBlock returns the result correctly.
+    // hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files.
+    // hStoreFiles[3] is a cold file.
+    try {
+      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+        hStoreFiles.get(0).getFileInfo().getHFileInfo(),
+        hStoreFiles.get(0).getFileInfo().getConf()));
+      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+        hStoreFiles.get(1).getFileInfo().getHFileInfo(),
+        hStoreFiles.get(1).getFileInfo().getConf()));
+      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+        hStoreFiles.get(2).getFileInfo().getHFileInfo(),
+        hStoreFiles.get(2).getFileInfo().getConf()));
+      assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+        hStoreFiles.get(3).getFileInfo().getHFileInfo(),
+        hStoreFiles.get(3).getFileInfo().getConf()));
+    } finally {
+      for (HStoreFile file : hStoreFiles) {
+        file.initReader();
+      }
+    }
+  }
+
+  @Test
+  public void testCacheOnReadColdFile() throws Exception {
+    // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call.
+    HStoreFile hStoreFile = hStoreFiles.get(3);
+    BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA);
+    testCacheOnRead(hStoreFile, cacheKey, 23025, false);
+  }
+
+  @Test
+  public void testCacheOnReadHotFile() throws Exception {
+    // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call.
+    HStoreFile hStoreFile = hStoreFiles.get(0);
+    BlockCacheKey cacheKey =
+      new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
+    testCacheOnRead(hStoreFile, cacheKey, 23025, true);
+  }
+
+  private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize,
+    boolean expectedCached) throws Exception {
+    // Execute the read block API which will try to cache the block if the block is a hot block.
+    hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false,
+      false, false, key.getBlockType(), DataBlockEncoding.NONE);
+    // Validate that the hot block gets cached and cold block is not cached.
+    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false, BlockType.DATA);
+    if (expectedCached) {
+      assertNotNull(block);
+    } else {
+      assertNull(block);
+    }
+  }
+
   private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
     int expectedColdBlocks) {
     int numHotBlocks = 0, numColdBlocks = 0;