blob: 78e03cce683f8b9c1d93af25a4627aa1795e6b17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used to test the functionality of the DataTieringManager.
*
* The mock online regions are stored in {@link TestDataTieringManager#testOnlineRegions}.
* For all tests, the setup of {@link TestDataTieringManager#testOnlineRegions} occurs only once.
* Please refer to {@link TestDataTieringManager#setupOnlineRegions()} for the structure.
* Additionally, a list of all store files is maintained in {@link TestDataTieringManager#hStoreFiles}.
* The characteristics of these store files are listed below:
* @formatter:off ## HStoreFile Information
*
* | HStoreFile | Region | Store | DataTiering | isHot |
* |------------------|--------------------|---------------------|-----------------------|-------|
* | hStoreFile0 | region1 | hStore11 | TIME_RANGE | true |
* | hStoreFile1 | region1 | hStore12 | NONE | true |
* | hStoreFile2 | region2 | hStore21 | TIME_RANGE | true |
* | hStoreFile3 | region2 | hStore22 | TIME_RANGE | false |
* @formatter:on
*/
@Category({ RegionServerTests.class, SmallTests.class })
public class TestDataTieringManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDataTieringManager.class);
private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final long DAY = 24 * 60 * 60 * 1000;
private static Configuration defaultConf;
private static FileSystem fs;
private static BlockCache blockCache;
private static CacheConfig cacheConf;
private static Path testDir;
private static final Map<String, HRegion> testOnlineRegions = new HashMap<>();
private static DataTieringManager dataTieringManager;
private static final List<HStoreFile> hStoreFiles = new ArrayList<>();
/**
* Represents the current lexicographically increasing string used as a row key when writing
* HFiles. It is incremented each time {@link #nextString()} is called to generate unique row
* keys.
*/
private static String rowKeyString;
@BeforeClass
public static void setupBeforeClass() throws Exception {
testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName());
defaultConf = TEST_UTIL.getConfiguration();
updateCommonConfigurations();
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
dataTieringManager = DataTieringManager.getInstance();
rowKeyString = "";
}
private static void updateCommonConfigurations() {
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32);
}
@FunctionalInterface
interface DataTieringMethodCallerWithPath {
boolean call(DataTieringManager manager, Path path) throws DataTieringException;
}
@FunctionalInterface
interface DataTieringMethodCallerWithKey {
boolean call(DataTieringManager manager, BlockCacheKey key) throws DataTieringException;
}
@Test
public void testDataTieringEnabledWithKey() throws IOException {
initializeTestEnvironment();
DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isDataTieringEnabled;
// Test with valid key
BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true);
// Test with another valid key
key = new BlockCacheKey(hStoreFiles.get(1).getPath(), 0, true, BlockType.DATA);
testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
// Test with valid key with no HFile Path
key = new BlockCacheKey(hStoreFiles.get(0).getPath().getName(), 0);
testDataTieringMethodWithKeyExpectingException(methodCallerWithKey, key,
new DataTieringException("BlockCacheKey Doesn't Contain HFile Path"));
}
@Test
public void testDataTieringEnabledWithPath() throws IOException {
initializeTestEnvironment();
DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isDataTieringEnabled;
// Test with valid path
Path hFilePath = hStoreFiles.get(1).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false);
// Test with another valid path
hFilePath = hStoreFiles.get(3).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true);
// Test with an incorrect path
hFilePath = new Path("incorrectPath");
testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath,
new DataTieringException("Incorrect HFile Path: " + hFilePath));
// Test with a non-existing HRegion path
Path basePath = hStoreFiles.get(0).getPath().getParent().getParent().getParent();
hFilePath = new Path(basePath, "incorrectRegion/cf1/filename");
testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath,
new DataTieringException("HRegion corresponding to " + hFilePath + " doesn't exist"));
// Test with a non-existing HStore path
basePath = hStoreFiles.get(0).getPath().getParent().getParent();
hFilePath = new Path(basePath, "incorrectCf/filename");
testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath,
new DataTieringException("HStore corresponding to " + hFilePath + " doesn't exist"));
}
@Test
public void testHotDataWithKey() throws IOException {
initializeTestEnvironment();
DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isHotData;
// Test with valid key
BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true);
// Test with another valid key
key = new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA);
testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
}
@Test
public void testHotDataWithPath() throws IOException {
initializeTestEnvironment();
DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isHotData;
// Test with valid path
Path hFilePath = hStoreFiles.get(2).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true);
// Test with another valid path
hFilePath = hStoreFiles.get(3).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false);
// Test with a filename where corresponding HStoreFile in not present
hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), "incorrectFileName");
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true);
}
@Test
public void testPrefetchWhenDataTieringEnabled() throws IOException {
setPrefetchBlocksOnOpen();
initializeTestEnvironment();
// Evict blocks from cache by closing the files and passing evict on close.
// Then initialize the reader again. Since Prefetch on open is set to true, it should prefetch
// those blocks.
for (HStoreFile file : hStoreFiles) {
file.closeStoreFile(true);
file.initReader();
}
// Since we have one cold file among four files, only three should get prefetched.
Optional<Map<String, Pair<String, Long>>> fullyCachedFiles = blockCache.getFullyCachedFiles();
assertTrue("We should get the fully cached files from the cache", fullyCachedFiles.isPresent());
Waiter.waitFor(defaultConf, 10000, () -> fullyCachedFiles.get().size() == 3);
assertEquals("Number of fully cached files are incorrect", 3, fullyCachedFiles.get().size());
}
private void setPrefetchBlocksOnOpen() {
defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
}
@Test
public void testColdDataFiles() throws IOException {
initializeTestEnvironment();
Set<BlockCacheKey> allCachedBlocks = new HashSet<>();
for (HStoreFile file : hStoreFiles) {
allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA));
}
// Verify hStoreFile3 is identified as cold data
DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isHotData;
Path hFilePath = hStoreFiles.get(3).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false);
// Verify all the other files in hStoreFiles are hot data
for (int i = 0; i < hStoreFiles.size() - 1; i++) {
hFilePath = hStoreFiles.get(i).getPath();
testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true);
}
try {
Set<String> coldFilePaths = dataTieringManager.getColdDataFiles(allCachedBlocks);
assertEquals(1, coldFilePaths.size());
} catch (DataTieringException e) {
fail("Unexpected DataTieringException: " + e.getMessage());
}
}
@Test
public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws IOException {
setCacheCompactBlocksOnWrite();
initializeTestEnvironment();
HRegion region = createHRegion("table3");
testCacheCompactedBlocksOnWrite(region, true);
}
@Test
public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException {
setCacheCompactBlocksOnWrite();
initializeTestEnvironment();
HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(5 * DAY));
testCacheCompactedBlocksOnWrite(region, true);
}
@Test
public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException {
setCacheCompactBlocksOnWrite();
initializeTestEnvironment();
HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(DAY));
testCacheCompactedBlocksOnWrite(region, false);
}
private void setCacheCompactBlocksOnWrite() {
defaultConf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, true);
}
private void testCacheCompactedBlocksOnWrite(HRegion region, boolean expectDataBlocksCached)
throws IOException {
HStore hStore = createHStore(region, "cf1");
createTestFilesForCompaction(hStore);
hStore.refreshStoreFiles();
region.stores.put(Bytes.toBytes("cf1"), hStore);
testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
long initialStoreFilesCount = hStore.getStorefilesCount();
long initialCacheDataBlockCount = blockCache.getDataBlockCount();
assertEquals(3, initialStoreFilesCount);
assertEquals(0, initialCacheDataBlockCount);
region.compact(true);
long compactedStoreFilesCount = hStore.getStorefilesCount();
long compactedCacheDataBlockCount = blockCache.getDataBlockCount();
assertEquals(1, compactedStoreFilesCount);
assertEquals(expectDataBlocksCached, compactedCacheDataBlockCount > 0);
}
private void createTestFilesForCompaction(HStore hStore) throws IOException {
long currentTime = System.currentTimeMillis();
Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath();
Configuration configuration = hStore.getReadOnlyConfiguration();
createHStoreFile(storeDir, configuration, currentTime - 2 * DAY);
createHStoreFile(storeDir, configuration, currentTime - 3 * DAY);
createHStoreFile(storeDir, configuration, currentTime - 4 * DAY);
}
@Test
public void testPickColdDataFiles() throws IOException {
initializeTestEnvironment();
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
assertEquals(1, coldDataFiles.size());
// hStoreFiles[3] is the cold file.
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
}
/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
* remains in the cache.
*/
@Test
public void testBlockEvictions() throws Exception {
initializeTestEnvironment();
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
// Create three Cache keys with cold data files and a block with hot data.
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}
// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());
// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));
// Verify that the bucket cache now contains 2 hot blocks blocks only.
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
// space.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}
/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
* block remains in the cache since the required space is freed.
*/
@Test
public void testBlockEvictionsAllColdBlocks() throws Exception {
initializeTestEnvironment();
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
// Create three Cache keys with three cold data blocks.
// hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));
// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}
// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());
// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));
// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
}
/*
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
*/
@Test
public void testBlockEvictionsHotBlocks() throws Exception {
initializeTestEnvironment();
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}
// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());
// Add an additional block which should evict the only cold block with an additional hot block.
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));
// Verify that the bucket cache now contains 2 hot blocks.
// Only one of the older hot blocks is retained and other one is the newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}
@Test
public void testFeatureKeyDisabled() throws Exception {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false);
initializeTestEnvironment();
try {
assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
// Verify that the DataaTieringManager instance is not instantiated in the
// instantiate call above.
assertNull(DataTieringManager.getInstance());
// Also validate that data temperature is not honoured.
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
List<BlockCacheKey> cacheKeys = new ArrayList<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
LOG.info("Adding {}", key);
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(key)));
}
// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());
// Add an additional hot block, which triggers eviction.
BlockCacheKey newKey =
new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));
// Verify that the bucket still contains the only cold block and one newly added hot block.
// The older hot blocks are evicted and data-tiering mechanism does not kick in to evict
// the cold block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
} finally {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
}
}
@Test
public void testCacheConfigShouldCacheFile() throws Exception {
// Evict the files from cache.
for (HStoreFile file : hStoreFiles) {
file.closeStoreFile(true);
}
// Verify that the API shouldCacheFileBlock returns the result correctly.
// hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files.
// hStoreFiles[3] is a cold file.
try {
assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
hStoreFiles.get(0).getFileInfo().getHFileInfo(),
hStoreFiles.get(0).getFileInfo().getConf()));
assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
hStoreFiles.get(1).getFileInfo().getHFileInfo(),
hStoreFiles.get(1).getFileInfo().getConf()));
assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
hStoreFiles.get(2).getFileInfo().getHFileInfo(),
hStoreFiles.get(2).getFileInfo().getConf()));
assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
hStoreFiles.get(3).getFileInfo().getHFileInfo(),
hStoreFiles.get(3).getFileInfo().getConf()));
} finally {
for (HStoreFile file : hStoreFiles) {
file.initReader();
}
}
}
@Test
public void testCacheOnReadColdFile() throws Exception {
// hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(3);
BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA);
testCacheOnRead(hStoreFile, cacheKey, 23025, false);
}
@Test
public void testCacheOnReadHotFile() throws Exception {
// hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(0);
BlockCacheKey cacheKey =
new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
testCacheOnRead(hStoreFile, cacheKey, 23025, true);
}
private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize,
boolean expectedCached) throws Exception {
// Execute the read block API which will try to cache the block if the block is a hot block.
hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false,
false, false, key.getBlockType(), DataBlockEncoding.NONE);
// Validate that the hot block gets cached and cold block is not cached.
HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false, BlockType.DATA);
if (expectedCached) {
assertNotNull(block);
} else {
assertNull(block);
}
}
private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
int expectedColdBlocks) {
int numHotBlocks = 0, numColdBlocks = 0;
Waiter.waitFor(defaultConf, 10000, 100, () -> (expectedTotalKeys == keys.size()));
int iter = 0;
for (BlockCacheKey key : keys) {
try {
if (dataTieringManager.isHotData(key)) {
numHotBlocks++;
} else {
numColdBlocks++;
}
} catch (Exception e) {
fail("Unexpected exception!");
}
}
assertEquals(expectedHotBlocks, numHotBlocks);
assertEquals(expectedColdBlocks, numColdBlocks);
}
private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
boolean expectedResult, DataTieringException exception) {
try {
boolean value = caller.call(dataTieringManager, path);
if (exception != null) {
fail("Expected DataTieringException to be thrown");
}
assertEquals(expectedResult, value);
} catch (DataTieringException e) {
if (exception == null) {
fail("Unexpected DataTieringException: " + e.getMessage());
}
assertEquals(exception.getMessage(), e.getMessage());
}
}
private void testDataTieringMethodWithKey(DataTieringMethodCallerWithKey caller,
BlockCacheKey key, boolean expectedResult, DataTieringException exception) {
try {
boolean value = caller.call(dataTieringManager, key);
if (exception != null) {
fail("Expected DataTieringException to be thrown");
}
assertEquals(expectedResult, value);
} catch (DataTieringException e) {
if (exception == null) {
fail("Unexpected DataTieringException: " + e.getMessage());
}
assertEquals(exception.getMessage(), e.getMessage());
}
}
private void testDataTieringMethodWithPathExpectingException(
DataTieringMethodCallerWithPath caller, Path path, DataTieringException exception) {
testDataTieringMethodWithPath(caller, path, false, exception);
}
private void testDataTieringMethodWithPathNoException(DataTieringMethodCallerWithPath caller,
Path path, boolean expectedResult) {
testDataTieringMethodWithPath(caller, path, expectedResult, null);
}
private void testDataTieringMethodWithKeyExpectingException(DataTieringMethodCallerWithKey caller,
BlockCacheKey key, DataTieringException exception) {
testDataTieringMethodWithKey(caller, key, false, exception);
}
private void testDataTieringMethodWithKeyNoException(DataTieringMethodCallerWithKey caller,
BlockCacheKey key, boolean expectedResult) {
testDataTieringMethodWithKey(caller, key, expectedResult, null);
}
private static void initializeTestEnvironment() throws IOException {
setupFileSystemAndCache();
setupOnlineRegions();
}
private static void setupFileSystemAndCache() throws IOException {
fs = HFileSystem.get(defaultConf);
blockCache = BlockCacheFactory.createBlockCache(defaultConf);
cacheConf = new CacheConfig(defaultConf, blockCache);
}
private static void setupOnlineRegions() throws IOException {
testOnlineRegions.clear();
hStoreFiles.clear();
long day = 24 * 60 * 60 * 1000;
long currentTime = System.currentTimeMillis();
HRegion region1 = createHRegion("table1");
HStore hStore11 = createHStore(region1, "cf1", getConfWithTimeRangeDataTieringEnabled(day));
hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(),
hStore11.getReadOnlyConfiguration(), currentTime));
hStore11.refreshStoreFiles();
HStore hStore12 = createHStore(region1, "cf2");
hStoreFiles.add(createHStoreFile(hStore12.getStoreContext().getFamilyStoreDirectoryPath(),
hStore12.getReadOnlyConfiguration(), currentTime - day));
hStore12.refreshStoreFiles();
region1.stores.put(Bytes.toBytes("cf1"), hStore11);
region1.stores.put(Bytes.toBytes("cf2"), hStore12);
HRegion region2 =
createHRegion("table2", getConfWithTimeRangeDataTieringEnabled((long) (2.5 * day)));
HStore hStore21 = createHStore(region2, "cf1");
hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(),
hStore21.getReadOnlyConfiguration(), currentTime - 2 * day));
hStore21.refreshStoreFiles();
HStore hStore22 = createHStore(region2, "cf2");
hStoreFiles.add(createHStoreFile(hStore22.getStoreContext().getFamilyStoreDirectoryPath(),
hStore22.getReadOnlyConfiguration(), currentTime - 3 * day));
hStore22.refreshStoreFiles();
region2.stores.put(Bytes.toBytes("cf1"), hStore21);
region2.stores.put(Bytes.toBytes("cf2"), hStore22);
for (HStoreFile file : hStoreFiles) {
file.initReader();
}
testOnlineRegions.put(region1.getRegionInfo().getEncodedName(), region1);
testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2);
}
private static HRegion createHRegion(String table) throws IOException {
return createHRegion(table, defaultConf);
}
private static HRegion createHRegion(String table, Configuration conf) throws IOException {
TableName tableName = TableName.valueOf(table);
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
.setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY))
.setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY))
.build();
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
Configuration testConf = new Configuration(conf);
CommonFSUtils.setRootDir(testConf, testDir);
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
HRegion region = new HRegion(regionFs, null, conf, htd, null);
// Manually sets the BlockCache for the HRegion instance.
// This is necessary because the region server is not started within this method,
// and therefore the BlockCache needs to be explicitly configured.
region.setBlockCache(blockCache);
return region;
}
private static HStore createHStore(HRegion region, String columnFamily) throws IOException {
return createHStore(region, columnFamily, defaultConf);
}
private static HStore createHStore(HRegion region, String columnFamily, Configuration conf)
throws IOException {
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily))
.setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY))
.setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY))
.build();
return new HStore(region, columnFamilyDescriptor, conf, false);
}
private static Configuration getConfWithTimeRangeDataTieringEnabled(long hotDataAge) {
Configuration conf = new Configuration(defaultConf);
conf.set(DataTieringManager.DATATIERING_KEY, DataTieringType.TIME_RANGE.name());
conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(hotDataAge));
return conf;
}
private static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp)
throws IOException {
String columnFamily = storeDir.getName();
StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeDir).withFileContext(new HFileContextBuilder().build()).build();
writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), timestamp);
return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true);
}
/**
* Writes random data to a store file with rows arranged in lexicographically increasing order.
* Each row is generated using the {@link #nextString()} method, ensuring that each subsequent row
* is lexicographically larger than the previous one.
*/
private static void writeStoreFileRandomData(final StoreFileWriter writer, byte[] columnFamily,
long timestamp) throws IOException {
int cellsPerFile = 10;
byte[] qualifier = Bytes.toBytes("qualifier");
byte[] value = generateRandomBytes(4 * 1024);
try {
for (int i = 0; i < cellsPerFile; i++) {
byte[] row = Bytes.toBytes(nextString());
writer.append(new KeyValue(row, columnFamily, qualifier, timestamp, value));
}
} finally {
writer.appendTrackedTimestampsToMetadata();
writer.close();
}
}
private static byte[] generateRandomBytes(int sizeInBytes) {
Random random = new Random();
byte[] randomBytes = new byte[sizeInBytes];
random.nextBytes(randomBytes);
return randomBytes;
}
/**
* Returns the lexicographically larger string every time it's called.
*/
private static String nextString() {
if (rowKeyString == null || rowKeyString.isEmpty()) {
rowKeyString = "a";
}
char lastChar = rowKeyString.charAt(rowKeyString.length() - 1);
if (lastChar < 'z') {
rowKeyString = rowKeyString.substring(0, rowKeyString.length() - 1) + (char) (lastChar + 1);
} else {
rowKeyString = rowKeyString + "a";
}
return rowKeyString;
}
}