HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly consider a file fully cached (#5777)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 855f183..9541939 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -2073,25 +2073,29 @@
     // so we need to count all blocks for this file in the backing map under
     // a read lock for the block offset
     final List<ReentrantReadWriteLock> locks = new ArrayList<>();
-    LOG.debug("Notifying caching completed for file {}, with total blocks {}", fileName,
-      dataBlockCount);
+    LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
+      fileName, totalBlockCount, dataBlockCount);
     try {
       final MutableInt count = new MutableInt();
       LOG.debug("iterating over {} entries in the backing map", backingMap.size());
       backingMap.entrySet().stream().forEach(entry -> {
-        if (entry.getKey().getHfileName().equals(fileName.getName())) {
+        if (
+          entry.getKey().getHfileName().equals(fileName.getName())
+            && entry.getKey().getBlockType().equals(BlockType.DATA)
+        ) {
           LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
             fileName, entry.getKey().getOffset());
           ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
           lock.readLock().lock();
           locks.add(lock);
+          // rechecks the given key is still there (no eviction happened before the lock acquired)
           if (backingMap.containsKey(entry.getKey())) {
             count.increment();
           }
         }
       });
-      // We may either place only data blocks on the BucketCache or all type of blocks
-      if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) {
+      // BucketCache would only have data blocks
+      if (dataBlockCount == count.getValue()) {
         LOG.debug("File {} has now been fully cached.", fileName);
         fileCacheCompleted(fileName, size);
       } else {
@@ -2100,15 +2104,17 @@
             + "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
           fileName, count.getValue(), dataBlockCount);
         if (ramCache.hasBlocksForFile(fileName.getName())) {
+          for (ReentrantReadWriteLock lock : locks) {
+            lock.readLock().unlock();
+          }
           LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
             + "and try the verification again.", fileName);
           Thread.sleep(100);
           notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
         } else {
-          LOG.info(
-            "We found only {} blocks cached from a total of {} for file {}, "
-              + "but no blocks pending caching. Maybe cache is full?",
-            count, dataBlockCount, fileName);
+          LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+            + "but no blocks pending caching. Maybe cache is full or evictions "
+            + "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
         }
       }
     } catch (InterruptedException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index eb3e3cc..7303cf5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.junit.After;
 import org.junit.Before;
@@ -89,7 +90,7 @@
   @Test
   public void testBlockEvictionOnRegionMove() throws Exception {
     // Write to table and flush
-    TableName tableRegionMove = writeDataToTable();
+    TableName tableRegionMove = writeDataToTable("testBlockEvictionOnRegionMove");
 
     HRegionServer regionServingRS =
       cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
@@ -115,7 +116,7 @@
   @Test
   public void testBlockEvictionOnGracefulStop() throws Exception {
     // Write to table and flush
-    TableName tableRegionClose = writeDataToTable();
+    TableName tableRegionClose = writeDataToTable("testBlockEvictionOnGracefulStop");
 
     HRegionServer regionServingRS =
       cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
@@ -138,8 +139,8 @@
     assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
   }
 
-  public TableName writeDataToTable() throws IOException, InterruptedException {
-    TableName tableName = TableName.valueOf("table1");
+  public TableName writeDataToTable(String testName) throws IOException, InterruptedException {
+    TableName tableName = TableName.valueOf(testName + EnvironmentEdgeManager.currentTime());
     byte[] row0 = Bytes.toBytes("row1");
     byte[] row1 = Bytes.toBytes("row2");
     byte[] family = Bytes.toBytes("family");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index a39df7e..d60d2c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -164,20 +164,21 @@
     // Load Blocks in cache
     Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
     HFile.createReader(fs, storeFile, cacheConf, true, conf);
-    while (bucketCache.backingMap.size() == 0) {
+    boolean evicted = false;
+    while (!PrefetchExecutor.isCompleted(storeFile)) {
+      if (bucketCache.backingMap.size() > 0 && !evicted) {
+        Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
+          bucketCache.backingMap.entrySet().iterator();
+        // Evict a data block from cache
+        Map.Entry<BlockCacheKey, BucketEntry> entry = it.next();
+        while (it.hasNext() && !evicted) {
+          if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
+            evicted = bucketCache.evictBlock(it.next().getKey());
+          }
+        }
+      }
       Thread.sleep(10);
     }
-    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
-      bucketCache.backingMap.entrySet().iterator();
-    // Evict Blocks from cache
-    bucketCache.evictBlock(it.next().getKey());
-    bucketCache.evictBlock(it.next().getKey());
-    int retries = 0;
-    while (!PrefetchExecutor.isCompleted(storeFile) && retries < 5) {
-      Thread.sleep(500);
-      retries++;
-    }
-    assertTrue(retries < 5);
     assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
     cleanupBucketCache(bucketCache);
   }