HBASE-30038: RefCnt Leak error when caching (#7995)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Pankaj Kumar <pankajkumar@apache.org>
Reviewed-by: Vaibhav Joshi <vjoshi@cloudera.com>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index f7a41a3..8946090 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -1171,9 +1171,13 @@
if (getCacheOnWrite()) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
- cache.cacheBlock(
- new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
- blockForCaching);
+ try {
+ cache.cacheBlock(
+ new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
+ blockForCaching);
+ } finally {
+ blockForCaching.release();
+ }
});
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index b8b147f..b635c2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -578,11 +578,11 @@
private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
- BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
- if (!shouldCacheBlock(cache, key)) {
- return;
- }
try {
+ BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
+ if (!shouldCacheBlock(cache, key)) {
+ return;
+ }
cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index fbc7f3b..bb5ab50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -38,8 +38,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -76,6 +78,7 @@
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -93,6 +96,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+
/**
* test hfile features.
*/
@@ -202,6 +207,60 @@
lru.shutdown();
}
+ @Test
+ public void testWriterCacheOnWriteSkipDoesNotLeak() throws Exception {
+ int bufCount = 32;
+ int blockSize = 4 * 1024;
+ ByteBuffAllocator alloc = initAllocator(true, blockSize, bufCount, 0);
+ fillByteBuffAllocator(alloc, bufCount);
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+ Configuration myConf = HBaseConfiguration.create(conf);
+ myConf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
+ myConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
+ myConf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false);
+ final AtomicInteger counter = new AtomicInteger();
+ RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
+ @Override
+ public void onLeak(String s, String s1) {
+ counter.incrementAndGet();
+ }
+ });
+ BlockCache cache = Mockito.mock(BlockCache.class);
+ Mockito.when(cache.shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any()))
+ .thenReturn(Optional.of(false));
+ Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testWriterCacheOnWriteSkipDoesNotLeak");
+ HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).build();
+
+ try {
+ Writer writer = new HFile.WriterFactory(myConf, new CacheConfig(myConf, null, cache, alloc))
+ .withPath(fs, hfilePath).withFileContext(context).create();
+ try {
+ writer.append(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("q"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")));
+ } finally {
+ writer.close();
+ }
+
+ Mockito.verify(cache).shouldCacheBlock(Mockito.any(), Mockito.anyLong(), Mockito.any());
+ Mockito.verify(cache, Mockito.never()).cacheBlock(Mockito.any(), Mockito.any(),
+ Mockito.anyBoolean(), Mockito.anyBoolean());
+ for (int i = 0; i < 30 && counter.get() == 0; i++) {
+ System.gc();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ alloc.allocate(128 * 1024).release();
+ }
+ assertEquals(0, counter.get());
+ } finally {
+ fs.delete(hfilePath, false);
+ alloc.clean();
+ }
+ }
+
private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws Exception {
assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 7ad83ba..c257b19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -207,15 +207,80 @@
new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, path.getName(), null);
writeDataBlocksAndCreateIndex(hbw, outputStream, biw);
-
- System.gc();
- Thread.sleep(1000);
-
- allocator.allocate(128 * 1024).release();
-
+ for (int i = 0; i < 30 && counter.get() == 0; i++) {
+ System.gc();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ allocator.allocate(128 * 1024).release();
+ }
assertEquals(0, counter.get());
}
+ @Test
+ public void testIntermediateIndexCacheOnWriteDoesNotLeak() throws Exception {
+ Configuration localConf = new Configuration(TEST_UTIL.getConfiguration());
+ localConf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+ localConf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true);
+ localConf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 4096);
+ localConf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 32);
+ localConf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+ ByteBuffAllocator allocator = ByteBuffAllocator.create(localConf, true);
+ List<ByteBuff> buffers = new ArrayList<>();
+ for (int i = 0; i < allocator.getTotalBufferCount(); i++) {
+ buffers.add(allocator.allocateOneBuffer());
+ assertEquals(0, allocator.getFreeBufferCount());
+ }
+ buffers.forEach(ByteBuff::release);
+ assertEquals(allocator.getTotalBufferCount(), allocator.getFreeBufferCount());
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+ final AtomicInteger counter = new AtomicInteger();
+ RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
+ @Override
+ public void onLeak(String s, String s1) {
+ counter.incrementAndGet();
+ }
+ });
+
+ Path localPath = new Path(TEST_UTIL.getDataTestDir(),
+ "block_index_testIntermediateIndexCacheOnWriteDoesNotLeak_" + compr);
+ HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true)
+ .withIncludesMvcc(includesMemstoreTS).withIncludesTags(true).withCompression(compr)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
+ HFileBlock.Writer hbw =
+ new HFileBlock.Writer(localConf, null, meta, allocator, meta.getBlocksize());
+ FSDataOutputStream outputStream = fs.create(localPath);
+ LruBlockCache cache = new LruBlockCache(8 * 1024 * 1024, 1024, true, localConf);
+ CacheConfig cacheConfig = new CacheConfig(localConf, null, cache, allocator);
+ HFileBlockIndex.BlockIndexWriter biw =
+ new HFileBlockIndex.BlockIndexWriter(hbw, cacheConfig, localPath.getName(), null);
+ biw.setMaxChunkSize(512);
+
+ try {
+ writeDataBlocksAndCreateIndex(hbw, outputStream, biw);
+ assertTrue(biw.getNumLevels() >= 3);
+ for (int i = 0; i < 30 && counter.get() == 0; i++) {
+ System.gc();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ allocator.allocate(128 * 1024).release();
+ }
+ assertEquals(0, counter.get());
+ } finally {
+ hbw.release();
+ cache.shutdown();
+ allocator.clean();
+ fs.delete(localPath, false);
+ }
+ }
+
private void clear() throws IOException {
keys.clear();
firstKeyInFile = null;