HBASE-30038: RefCnt Leak error when caching (#7995)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Pankaj Kumar <pankajkumar@apache.org>
Reviewed-by: Vaibhav Joshi <vjoshi@cloudera.com>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index f7a41a3..8946090 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -1171,9 +1171,13 @@
       if (getCacheOnWrite()) {
         cacheConf.getBlockCache().ifPresent(cache -> {
           HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
-          cache.cacheBlock(
-            new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
-            blockForCaching);
+          try {
+            cache.cacheBlock(
+              new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
+              blockForCaching);
+          } finally {
+            blockForCaching.release();
+          }
         });
       }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index b8b147f..b635c2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -578,11 +578,11 @@
   private void doCacheOnWrite(long offset) {
     cacheConf.getBlockCache().ifPresent(cache -> {
       HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
-      BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
-      if (!shouldCacheBlock(cache, key)) {
-        return;
-      }
       try {
+        BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
+        if (!shouldCacheBlock(cache, key)) {
+          return;
+        }
         cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
       } finally {
         // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index fbc7f3b..bb5ab50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -38,8 +38,10 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -76,6 +78,7 @@
 import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -93,6 +96,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+
 /**
  * test hfile features.
  */
@@ -202,6 +207,60 @@
     lru.shutdown();
   }
 
+  @Test
+  public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception {
+    int bufCount = 32;
+    int blockSize = 4 * 1024;
+    ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0);
+    fillByteBuffAllocator(alloc, bufCount);
+    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+    Configuration myConf = HBaseConfiguration.create(conf);
+    myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
+    myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
+    myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false);
+    final AtomicInteger counter = new AtomicInteger();
+    RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
+      @Override
+      public void onLeak(String s, String s1) {
+        counter.incrementAndGet();
+      }
+    });
+    BlockCache cache = Mockito.mock(BlockCache.class);
+    Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()))
+      .thenReturn(Optional.of(false));
+    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak");
+    HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build();
+
+    try {
+      Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc))
+        .withPath(fs, hfilePath).withFileContext(context).create();
+      try {
+        writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"),
+          HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")));
+      } finally {
+        writer.close();
+      }
+
+      Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any());
+      Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(),
+        Mockito.anyBoolean(), Mockito.anyBoolean());
+      for (int i = 0; i < 30 && counter.get() == 0; i++) {
+        System.gc();
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+        alloc.allocate(128 * 1024).release();
+      }
+      assertEquals(0, counter.get());
+    } finally {
+      fs.delete(hfilePath, false);
+      alloc.clean();
+    }
+  }
+
   private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception {
     assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 7ad83ba..c257b19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -207,15 +207,80 @@
       new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, path.getName(), null);
 
     writeDataBlocksAndCreateIndex(hbw, outputStream, biw);
-
-    System.gc();
-    Thread.sleep(1000);
-
-    allocator.allocate(128 * 1024).release();
-
+    for (int i = 0; i < 30 && counter.get() == 0; i++) {
+      System.gc();
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+      allocator.allocate(128 * 1024).release();
+    }
     assertEquals(0, counter.get());
   }
 
+  @Test
+  public void testIntermediateIndexCacheOnWriteDoesNotLeak() throws Exception {
+    Configuration localConf = new Configuration(TEST_UTIL.getConfiguration());
+    localConf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+    localConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true);
+    localConf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 4096);
+    localConf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 32);
+    localConf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+    ByteBuffAllocator allocator = ByteBuffAllocator.create(localConf, true);
+    List<ByteBuff> buffers = new ArrayList<>();
+    for (int i = 0; i < allocator.getTotalBufferCount(); i++) {
+      buffers.add(allocator.allocateOneBuffer());
+      assertEquals(0, allocator.getFreeBufferCount());
+    }
+    buffers.forEach(ByteBuff::release);
+    assertEquals(allocator.getTotalBufferCount(), allocator.getFreeBufferCount());
+    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+    final AtomicInteger counter = new AtomicInteger();
+    RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
+      @Override
+      public void onLeak(String s, String s1) {
+        counter.incrementAndGet();
+      }
+    });
+
+    Path localPath = new Path(TEST_UTIL.getDataTestDir(),
+      "block_index_testIntermediateIndexCacheOnWriteDoesNotLeak_" + compr);
+    HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true)
+      .withIncludesMvcc(includesMemstoreTS).withIncludesTags(true).withCompression(compr)
+      .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
+    HFileBlock.Writer hbw =
+      new HFileBlock.Writer(localConf, null, meta, allocator, meta.getBlocksize());
+    FSDataOutputStream outputStream = fs.create(localPath);
+    LruBlockCache cache = new LruBlockCache(8 * 1024 * 1024, 1024, true, localConf);
+    CacheConfig cacheConfig = new CacheConfig(localConf, null, cache, allocator);
+    HFileBlockIndex.BlockIndexWriter biw =
+      new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, localPath.getName(), null);
+    biw.setMaxChunkSize(512);
+
+    try {
+      writeDataBlocksAndCreateIndex(hbw, outputStream, biw);
+      assertTrue(biw.getNumLevels() >= 3);
+      for (int i = 0; i < 30 && counter.get() == 0; i++) {
+        System.gc();
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+        allocator.allocate(128 * 1024).release();
+      }
+      assertEquals(0, counter.get());
+    } finally {
+      hbw.release();
+      cache.shutdown();
+      allocator.clean();
+      fs.delete(localPath, false);
+    }
+  }
+
   private void clear() throws IOException {
     keys.clear();
     firstKeyInFile = null;