blob: e326eccd2942182a21445d884d5fcd3e62df47d3 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ IOTests.class, SmallTests.class })
public class TestHFileScannerImplReferenceCount {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
@Rule
public TestName CASE = new TestName();
@Parameters(name = "{index}: ioengine={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
new Object[] { "mmap" }, new Object[] { "pmem" });
}
@Parameter
public String ioengine;
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final byte[] SUFFIX = randLongBytes();
private static final int CELL_COUNT = 1000;
private static byte[] randLongBytes() {
Random rand = new Random();
byte[] keys = new byte[30];
rand.nextBytes(keys);
return keys;
}
// It's a deep copy of configuration of UTIL, DON'T use shallow copy.
private Configuration conf;
private Path workDir;
private FileSystem fs;
private Path hfilePath;
private Cell firstCell = null;
private Cell secondCell = null;
private ByteBuffAllocator allocator;
@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
// Set the max chunk size and min entries key to be very small for index block, so that we can
// create an index block tree with level >= 2.
conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
// Create a bucket cache with 32MB.
conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
conf.setInt(BUFFER_SIZE_KEY, 1024);
conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
// All allocated ByteBuff are pooled ByteBuff.
conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
}
@Before
public void setUp() throws IOException {
String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
this.workDir = UTIL.getDataTestDir(caseName);
if (!"offheap".equals(ioengine)) {
ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
}
UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
this.firstCell = null;
this.secondCell = null;
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
this.conf = new Configuration(UTIL.getConfiguration());
this.fs = this.workDir.getFileSystem(conf);
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
}
@After
public void tearDown() throws IOException {
this.allocator.clean();
this.fs.delete(this.workDir, true);
}
private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
Assert.assertTrue(cache instanceof CombinedBlockCache);
BlockCache[] blockCaches = cache.getBlockCaches();
Assert.assertEquals(blockCaches.length, 2);
Assert.assertTrue(blockCaches[1] instanceof BucketCache);
TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
}
private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
DataBlockEncoding encoding, int cellCount) throws IOException {
HFileContext context =
new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
.withCompression(compression).withDataBlockEncoding(encoding).build();
try (HFile.Writer writer =
new HFile.WriterFactory(conf, new CacheConfig(conf)).withPath(fs, hfilePath)
.withFileContext(context).create()) {
Random rand = new Random(9713312); // Just a fixed seed.
for (int i = 0; i < cellCount; ++i) {
byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
// A random-length random value.
byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
KeyValue keyValue =
new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
if (firstCell == null) {
firstCell = keyValue;
} else if (secondCell == null) {
secondCell = keyValue;
}
writer.append(keyValue);
}
}
}
/**
* A careful UT for validating the reference count mechanism, if want to change this UT please
* read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
*/
private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
throws Exception {
writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
HFileBlock curBlock, prevBlock;
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
// Only one refCnt for RPC path.
Assert.assertEquals(block1.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
Assert.assertFalse(block1 == block2);
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
this.assertRefCnt(curBlock, 2);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertTrue(curBlock == scanner.curBlock);
this.assertRefCnt(curBlock, 2);
prevBlock = curBlock;
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
this.assertRefCnt(prevBlock, 2);
this.assertRefCnt(curBlock, 2);
// After shipped, the prevBlock will be release, but curBlock is still referenced by the
// curBlock.
scanner.shipped();
this.assertRefCnt(prevBlock, 1);
this.assertRefCnt(curBlock, 2);
// Try to ship again, though with nothing to client.
scanner.shipped();
this.assertRefCnt(prevBlock, 1);
this.assertRefCnt(curBlock, 2);
// The curBlock will also be released.
scanner.close();
this.assertRefCnt(curBlock, 1);
// Finish the block & block2 RPC path
Assert.assertTrue(block1.release());
Assert.assertTrue(block2.release());
// Evict the LRUBlockCache
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(prevBlock.refCnt(), 0);
Assert.assertEquals(curBlock.refCnt(), 0);
int count = 0;
Assert.assertTrue(scanner.seekTo());
++count;
while (scanner.next()) {
count++;
}
assertEquals(CELL_COUNT, count);
}
/**
* See HBASE-22480
*/
@Test
public void testSeekBefore() throws Exception {
HFileBlock curBlock, prevBlock;
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
// Wait until flushed to IOEngine;
waitBucketCacheFlushed(defaultBC);
// One RPC reference path.
Assert.assertEquals(block1.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
// Let the curBlock refer to block2.
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block2);
Assert.assertEquals(1, block2.refCnt());
this.assertRefCnt(curBlock, 2);
prevBlock = scanner.curBlock;
// Release the block1, no other reference.
Assert.assertTrue(block1.release());
Assert.assertEquals(0, block1.refCnt());
// Release the block2, no other reference.
Assert.assertTrue(block2.release());
Assert.assertEquals(0, block2.refCnt());
// Do the seekBefore: the newBlock will be the previous block of curBlock.
Assert.assertTrue(scanner.seekBefore(secondCell));
Assert.assertEquals(scanner.prevBlocks.size(), 1);
Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
curBlock = scanner.curBlock;
// the curBlock is read from IOEngine, so a different block.
Assert.assertFalse(curBlock == block1);
// Two reference for curBlock: 1. scanner; 2. blockCache.
this.assertRefCnt(curBlock, 2);
// Reference count of prevBlock must be unchanged because we haven't shipped.
this.assertRefCnt(prevBlock, 2);
// Do the shipped
scanner.shipped();
Assert.assertEquals(scanner.prevBlocks.size(), 0);
Assert.assertNotNull(scanner.curBlock);
this.assertRefCnt(curBlock, 2);
this.assertRefCnt(prevBlock, 1);
// Do the close
scanner.close();
Assert.assertNull(scanner.curBlock);
this.assertRefCnt(curBlock, 1);
this.assertRefCnt(prevBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(0, curBlock.refCnt());
Assert.assertEquals(0, prevBlock.refCnt());
// Reload the block1 again.
block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
// Wait until flushed to IOEngine;
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
Assert.assertTrue(block1.release());
Assert.assertEquals(0, block1.refCnt());
// Re-seek to the begin.
Assert.assertTrue(scanner.seekTo());
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block1);
this.assertRefCnt(curBlock, 2);
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
this.assertRefCnt(curBlock, 2);
scanner.close();
this.assertRefCnt(curBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
Assert.assertEquals(0, curBlock.refCnt());
}
private void assertRefCnt(HFileBlock block, int value) {
if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
Assert.assertEquals(value, block.refCnt());
} else {
Assert.assertEquals(value - 1, block.refCnt());
}
}
@Test
public void testDefault() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
}
@Test
public void testCompression() throws Exception {
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
}
@Test
public void testDataBlockEncoding() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
}
@Test
public void testDataBlockEncodingAndCompression() throws Exception {
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
}
@Test
public void testWithLruBlockCache() throws Exception {
HFileBlock curBlock;
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
// Set LruBlockCache
conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
// One RPC reference path.
Assert.assertEquals(block1.refCnt(), 0);
Assert.assertEquals(block2.refCnt(), 0);
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
Assert.assertTrue(curBlock == block1);
Assert.assertEquals(curBlock.refCnt(), 0);
Assert.assertTrue(scanner.prevBlocks.isEmpty());
// Switch to next block
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertTrue(curBlock == block2);
Assert.assertEquals(curBlock.refCnt(), 0);
Assert.assertEquals(curBlock.retain().refCnt(), 0);
// Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
// in prevBlocks.
Assert.assertTrue(scanner.prevBlocks.isEmpty());
// close the scanner
scanner.close();
Assert.assertNull(scanner.curBlock);
Assert.assertTrue(scanner.prevBlocks.isEmpty());
}
@Test
public void testDisabledBlockCache() throws Exception {
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
// Set LruBlockCache
conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
Assert.assertNull(defaultBC);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
DataBlockEncoding.NONE, reader).getHFileBlock();
Assert.assertTrue(block1.isSharedMem());
Assert.assertTrue(block1 instanceof SharedMemHFileBlock);
Assert.assertEquals(1, block1.refCnt());
Assert.assertTrue(block1.release());
}
}