HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly consider a file fully cached (#5777)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 855f183..9541939 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -2073,25 +2073,29 @@
// so we need to count all blocks for this file in the backing map under
// a read lock for the block offset
final List<ReentrantReadWriteLock> locks = new ArrayList<>();
- LOG.debug("Notifying caching completed for file {}, with total blocks {}", fileName,
- dataBlockCount);
+ LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
+ fileName, totalBlockCount, dataBlockCount);
try {
final MutableInt count = new MutableInt();
LOG.debug("iterating over {} entries in the backing map", backingMap.size());
backingMap.entrySet().stream().forEach(entry -> {
- if (entry.getKey().getHfileName().equals(fileName.getName())) {
+ if (
+ entry.getKey().getHfileName().equals(fileName.getName())
+ && entry.getKey().getBlockType().equals(BlockType.DATA)
+ ) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName, entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
lock.readLock().lock();
locks.add(lock);
+ // rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
}
}
});
- // We may either place only data blocks on the BucketCache or all type of blocks
- if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) {
+ // BucketCache would only have data blocks
+ if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
@@ -2100,15 +2104,17 @@
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
+ for (ReentrantReadWriteLock lock : locks) {
+ lock.readLock().unlock();
+ }
LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
+ "and try the verification again.", fileName);
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
- LOG.info(
- "We found only {} blocks cached from a total of {} for file {}, "
- + "but no blocks pending caching. Maybe cache is full?",
- count, dataBlockCount, fileName);
+ LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ + "but no blocks pending caching. Maybe cache is full or evictions "
+ + "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
}
} catch (InterruptedException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index eb3e3cc..7303cf5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.Before;
@@ -89,7 +90,7 @@
@Test
public void testBlockEvictionOnRegionMove() throws Exception {
// Write to table and flush
- TableName tableRegionMove = writeDataToTable();
+ TableName tableRegionMove = writeDataToTable("testBlockEvictionOnRegionMove");
HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
@@ -115,7 +116,7 @@
@Test
public void testBlockEvictionOnGracefulStop() throws Exception {
// Write to table and flush
- TableName tableRegionClose = writeDataToTable();
+ TableName tableRegionClose = writeDataToTable("testBlockEvictionOnGracefulStop");
HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
@@ -138,8 +139,8 @@
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}
- public TableName writeDataToTable() throws IOException, InterruptedException {
- TableName tableName = TableName.valueOf("table1");
+ public TableName writeDataToTable(String testName) throws IOException, InterruptedException {
+ TableName tableName = TableName.valueOf(testName + EnvironmentEdgeManager.currentTime());
byte[] row0 = Bytes.toBytes("row1");
byte[] row1 = Bytes.toBytes("row2");
byte[] family = Bytes.toBytes("family");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index a39df7e..d60d2c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -164,20 +164,21 @@
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
HFile.createReader(fs, storeFile, cacheConf, true, conf);
- while (bucketCache.backingMap.size() == 0) {
+ boolean evicted = false;
+ while (!PrefetchExecutor.isCompleted(storeFile)) {
+ if (bucketCache.backingMap.size() > 0 && !evicted) {
+ Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
+ bucketCache.backingMap.entrySet().iterator();
+ // Evict a data block from cache
+ Map.Entry<BlockCacheKey, BucketEntry> entry = it.next();
+ while (it.hasNext() && !evicted) {
+ if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
+ evicted = bucketCache.evictBlock(it.next().getKey());
+ }
+ }
+ }
Thread.sleep(10);
}
- Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
- bucketCache.backingMap.entrySet().iterator();
- // Evict Blocks from cache
- bucketCache.evictBlock(it.next().getKey());
- bucketCache.evictBlock(it.next().getKey());
- int retries = 0;
- while (!PrefetchExecutor.isCompleted(storeFile) && retries < 5) {
- Thread.sleep(500);
- retries++;
- }
- assertTrue(retries < 5);
assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
cleanupBucketCache(bucketCache);
}