| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.client; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; |
| import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.coprocessor.RegionObserver; |
| import org.apache.hadoop.hbase.io.hfile.BlockCache; |
| import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.CachedBlock; |
| import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; |
| import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; |
| import org.apache.hadoop.hbase.regionserver.BloomType; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HStore; |
| import org.apache.hadoop.hbase.regionserver.InternalScanner; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext; |
| import org.apache.hadoop.hbase.testclassification.ClientTests; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; |
| |
| @Category({ LargeTests.class, ClientTests.class }) |
| @SuppressWarnings("deprecation") |
| public class TestBlockEvictionFromClient { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class); |
| protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); |
| static byte[][] ROWS = new byte[2][]; |
| private static int NO_OF_THREADS = 3; |
| private static byte[] ROW = Bytes.toBytes("testRow"); |
| private static byte[] ROW1 = Bytes.toBytes("testRow1"); |
| private static byte[] ROW2 = Bytes.toBytes("testRow2"); |
| private static byte[] ROW3 = Bytes.toBytes("testRow3"); |
| private static byte[] FAMILY = Bytes.toBytes("testFamily"); |
| private static byte[][] FAMILIES_1 = new byte[1][0]; |
| private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); |
| private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| private static byte[] data = new byte[1000]; |
| private static byte[] data2 = Bytes.add(data, data); |
| protected static int SLAVES = 1; |
| private static CountDownLatch latch; |
| private static CountDownLatch getLatch; |
| private static CountDownLatch compactionLatch; |
| private static CountDownLatch exceptionLatch; |
| |
| @Rule |
| public TestName name = new TestName(); |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| ROWS[0] = ROW; |
| ROWS[1] = ROW1; |
| Configuration conf = TEST_UTIL.getConfiguration(); |
| conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, |
| MultiRowMutationEndpoint.class.getName()); |
| conf.setInt("hbase.regionserver.handler.count", 20); |
| conf.setInt("hbase.bucketcache.size", 400); |
| conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); |
| conf.setFloat("hfile.block.cache.size", 0.2f); |
| conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); |
| conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry |
| conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); |
| FAMILIES_1[0] = FAMILY; |
| TEST_UTIL.startMiniCluster(SLAVES); |
| } |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| TEST_UTIL.shutdownMiniCluster(); |
| } |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @Before |
| public void setUp() throws Exception { |
| CustomInnerRegionObserver.waitForGets.set(false); |
| CustomInnerRegionObserver.countOfNext.set(0); |
| CustomInnerRegionObserver.countOfGets.set(0); |
| } |
| |
| /** |
| * @throws java.lang.Exception |
| */ |
| @After |
| public void tearDown() throws Exception { |
| if (latch != null) { |
| while (latch.getCount() > 0) { |
| latch.countDown(); |
| } |
| } |
| if (getLatch != null) { |
| getLatch.countDown(); |
| } |
| if (compactionLatch != null) { |
| compactionLatch.countDown(); |
| } |
| if (exceptionLatch != null) { |
| exceptionLatch.countDown(); |
| } |
| latch = null; |
| getLatch = null; |
| compactionLatch = null; |
| exceptionLatch = null; |
| CustomInnerRegionObserver.throwException.set(false); |
| // Clean up the tables for every test case |
| TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames(); |
| for (TableName tableName : listTableNames) { |
| if (!tableName.isSystemTable()) { |
| TEST_UTIL.getAdmin().disableTable(tableName); |
| TEST_UTIL.getAdmin().deleteTable(tableName); |
| } |
| } |
| } |
| |
| @Test |
| public void testBlockEvictionWithParallelScans() throws Exception { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName) |
| .getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| // insert data. 2 Rows are added |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); |
| // data was in memstore so don't expect any changes |
| // flush the data |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // Load cache |
| // Create three sets of scan |
| ScanThread[] scanThreads = initiateScan(table, false); |
| Thread.sleep(100); |
| checkForBlockEviction(cache, false, false); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| // CustomInnerRegionObserver.sleepTime.set(0); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // read the data and expect same blocks, one new hit, no misses |
| assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // Check how this miss is happening |
| // insert a second column, read the row, no new blocks, 3 new hits |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| byte[] data2 = Bytes.add(data, data); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| Result r = table.get(new Get(ROW)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // flush, one new block |
| System.out.println("Flushing cache"); |
| region.flush(true); |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // compact, net minus two blocks, two hits, no misses |
| System.out.println("Compacting"); |
| assertEquals(2, store.getStorefilesCount()); |
| store.triggerMajorCompaction(); |
| region.compact(true); |
| waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max |
| assertEquals(1, store.getStorefilesCount()); |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // read the row, this should be a cache miss because we don't cache data |
| // blocks on compaction |
| r = table.get(new Get(ROW)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testParallelGetsAndScans() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(2); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| insertData(table); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // Create three sets of scan |
| CustomInnerRegionObserver.waitForGets.set(true); |
| ScanThread[] scanThreads = initiateScan(table, false); |
| // Create three sets of gets |
| GetThread[] getThreads = initiateGet(table, false, false); |
| checkForBlockEviction(cache, false, false); |
| CustomInnerRegionObserver.waitForGets.set(false); |
| checkForBlockEviction(cache, false, false); |
| for (GetThread thread : getThreads) { |
| thread.join(); |
| } |
| // Verify whether the gets have returned the blocks that it had |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // giving some time for the block to be decremented |
| checkForBlockEviction(cache, true, false); |
| getLatch.countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| System.out.println("Scans should have returned the bloks"); |
| // Check with either true or false |
| CustomInnerRegionObserver.waitForGets.set(false); |
| // The scan should also have released the blocks by now |
| checkForBlockEviction(cache, true, true); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // Create three sets of gets |
| GetThread[] getThreads = initiateGet(table, false, false); |
| Thread.sleep(200); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (GetThread thread : getThreads) { |
| thread.join(); |
| } |
| // Verify whether the gets have returned the blocks that it had |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // giving some time for the block to be decremented |
| checkForBlockEviction(cache, true, false); |
| getLatch.countDown(); |
| System.out.println("Gets should have returned the bloks"); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| // TODO : check how block index works here |
| public void testGetsWithMultiColumnsAndExplicitTracker() |
| throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| BlockCache cache = setCacheProperties(region); |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| for (int i = 1; i < 10; i++) { |
| put = new Put(ROW); |
| put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2); |
| table.put(put); |
| if (i % 2 == 0) { |
| region.flush(true); |
| } |
| } |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // Create three sets of gets |
| GetThread[] getThreads = initiateGet(table, true, false); |
| Thread.sleep(200); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| int refCount = 0; |
| int noOfBlocksWithRef = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| System.out.println("The refCount is " + refCount); |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| noOfBlocksWithRef++; |
| } |
| } |
| assertTrue(usedBlocksFound); |
| // the number of blocks referred |
| assertEquals(10, noOfBlocksWithRef); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (GetThread thread : getThreads) { |
| thread.join(); |
| } |
| // Verify whether the gets have returned the blocks that it had |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // giving some time for the block to be decremented |
| checkForBlockEviction(cache, true, false); |
| getLatch.countDown(); |
| System.out.println("Gets should have returned the bloks"); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| byte[][] fams = new byte[10][]; |
| fams[0] = FAMILY; |
| for (int i = 1; i < 10; i++) { |
| fams[i] = (Bytes.toBytes("testFamily" + i)); |
| } |
| table = TEST_UTIL.createTable(tableName, fams, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| BlockCache cache = setCacheProperties(region); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| for (int i = 1; i < 10; i++) { |
| put = new Put(ROW); |
| put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); |
| table.put(put); |
| if (i % 2 == 0) { |
| region.flush(true); |
| } |
| } |
| region.flush(true); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // Create three sets of gets |
| GetThread[] getThreads = initiateGet(table, true, true); |
| Thread.sleep(200); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| int refCount = 0; |
| int noOfBlocksWithRef = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| System.out.println("The refCount is " + refCount); |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| noOfBlocksWithRef++; |
| } |
| } |
| assertTrue(usedBlocksFound); |
| // the number of blocks referred |
| assertEquals(3, noOfBlocksWithRef); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (GetThread thread : getThreads) { |
| thread.join(); |
| } |
| // Verify whether the gets have returned the blocks that it had |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // giving some time for the block to be decremented |
| checkForBlockEviction(cache, true, false); |
| getLatch.countDown(); |
| System.out.println("Gets should have returned the bloks"); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testBlockRefCountAfterSplits() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName); |
| // This test expects rpc refcount of cached data blocks to be 0 after split. After split, |
| // two daughter regions are opened and a compaction is scheduled to get rid of reference |
| // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1. |
| // It is flakey since compaction can kick in anytime. To solve this issue, table is created |
| // with compaction disabled. |
| table = TEST_UTIL.createTable( |
| TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1, |
| null, BloomType.ROW, 1024, null); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW2); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| put = new Put(ROW3); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers()); |
| int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size(); |
| LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(), |
| regionCount); |
| TEST_UTIL.getAdmin().split(tableName, ROW1); |
| // Wait for splits |
| TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount); |
| region.compact(true); |
| List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions(); |
| for (HRegion r: regions) { |
| LOG.info("" + r.getCompactionState()); |
| TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE)); |
| } |
| LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners |
| // should be closed inorder to return those blocks |
| iterateBlockCache(cache, iterator); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testMultiGets() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(2); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // Create three sets of gets |
| MultiGetThread[] getThreads = initiateMultiGet(table); |
| Thread.sleep(200); |
| int refCount; |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| boolean foundNonZeroBlock = false; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| assertEquals(NO_OF_THREADS, refCount); |
| foundNonZeroBlock = true; |
| } |
| } |
| assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (MultiGetThread thread : getThreads) { |
| thread.join(); |
| } |
| // Verify whether the gets have returned the blocks that it had |
| CustomInnerRegionObserver.waitForGets.set(true); |
| // giving some time for the block to be decremented |
| iterateBlockCache(cache, iterator); |
| getLatch.countDown(); |
| System.out.println("Gets should have returned the bloks"); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| @Test |
| public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| // Check if get() returns blocks on its close() itself |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| byte[][] fams = new byte[10][]; |
| fams[0] = FAMILY; |
| for (int i = 1; i < 10; i++) { |
| fams[i] = (Bytes.toBytes("testFamily" + i)); |
| } |
| table = TEST_UTIL.createTable(tableName, fams, 1, 1024, |
| CustomInnerRegionObserver.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| BlockCache cache = setCacheProperties(region); |
| |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| region.flush(true); |
| for (int i = 1; i < 10; i++) { |
| put = new Put(ROW); |
| put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); |
| table.put(put); |
| if (i % 2 == 0) { |
| region.flush(true); |
| } |
| } |
| region.flush(true); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| region.flush(true); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| // Create three sets of gets |
| ScanThread[] scanThreads = initiateScan(table, true); |
| Thread.sleep(200); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| int refCount = 0; |
| int noOfBlocksWithRef = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| System.out.println("The refCount is " + refCount); |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| noOfBlocksWithRef++; |
| } |
| } |
| assertTrue(usedBlocksFound); |
| // the number of blocks referred |
| assertEquals(12, noOfBlocksWithRef); |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| // giving some time for the block to be decremented |
| checkForBlockEviction(cache, true, false); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| private BlockCache setCacheProperties(HRegion region) { |
| Iterator<HStore> strItr = region.getStores().iterator(); |
| BlockCache cache = null; |
| while (strItr.hasNext()) { |
| HStore store = strItr.next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| // Use the last one |
| cache = cacheConf.getBlockCache().get(); |
| } |
| return cache; |
| } |
| |
| @Test |
| public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, |
| InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(2); |
| // Check if get() returns blocks on its close() itself |
| getLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserverWrapper.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| // insert data. 2 Rows are added |
| insertData(table); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // CustomInnerRegionObserver.sleepTime.set(5000); |
| // Create three sets of scan |
| CustomInnerRegionObserver.waitForGets.set(true); |
| ScanThread[] scanThreads = initiateScan(table, false); |
| // Create three sets of gets |
| GetThread[] getThreads = initiateGet(table, false, false); |
| // The block would have been decremented for the scan case as it was |
| // wrapped |
| // before even the postNext hook gets executed. |
| // giving some time for the block to be decremented |
| Thread.sleep(100); |
| CustomInnerRegionObserver.waitForGets.set(false); |
| checkForBlockEviction(cache, false, false); |
| // countdown the latch |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (GetThread thread : getThreads) { |
| thread.join(); |
| } |
| getLatch.countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testScanWithCompaction() throws IOException, InterruptedException { |
| testScanWithCompactionInternals(name.getMethodName(), false); |
| } |
| |
| @Test |
| public void testReverseScanWithCompaction() throws IOException, InterruptedException { |
| testScanWithCompactionInternals(name.getMethodName(), true); |
| } |
| |
| private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) |
| throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| compactionLatch = new CountDownLatch(1); |
| TableName tableName = TableName.valueOf(tableNameStr); |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserverWrapper.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| // insert data. 2 Rows are added |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // read the data and expect same blocks, one new hit, no misses |
| int refCount = 0; |
| // Check how this miss is happening |
| // insert a second column, read the row, no new blocks, 3 new hits |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| byte[] data2 = Bytes.add(data, data); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| // flush, one new block |
| System.out.println("Flushing cache"); |
| region.flush(true); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // Create three sets of scan |
| ScanThread[] scanThreads = initiateScan(table, reversed); |
| Thread.sleep(100); |
| iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); |
| usedBlocksFound = false; |
| System.out.println("Compacting"); |
| assertEquals(2, store.getStorefilesCount()); |
| store.triggerMajorCompaction(); |
| region.compact(true); |
| waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max |
| assertEquals(1, store.getStorefilesCount()); |
| // Even after compaction is done we will have some blocks that cannot |
| // be evicted this is because the scan is still referencing them |
| iterator = cache.iterator(); |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 as they are not yet cleared |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); |
| // Should not throw exception |
| compactionLatch.countDown(); |
| latch.countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| // by this time all blocks should have been evicted |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| Result r = table.get(new Get(ROW)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); |
| // The gets would be working on new blocks |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() |
| throws IOException, InterruptedException { |
| // do flush and scan in parallel |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| compactionLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserverWrapper.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| |
| // insert data. 2 Rows are added |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // read the data and expect same blocks, one new hit, no misses |
| int refCount = 0; |
| // Check how this miss is happening |
| // insert a second column, read the row, no new blocks, 3 new hits |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| byte[] data2 = Bytes.add(data, data); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| // flush, one new block |
| System.out.println("Flushing cache"); |
| region.flush(true); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| // Create three sets of scan |
| ScanThread[] scanThreads = initiateScan(table, false); |
| Thread.sleep(100); |
| iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| // Make a put and do a flush |
| QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| data2 = Bytes.add(data, data); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| // flush, one new block |
| System.out.println("Flushing cache"); |
| region.flush(true); |
| assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); |
| usedBlocksFound = false; |
| System.out.println("Compacting"); |
| assertEquals(3, store.getStorefilesCount()); |
| store.triggerMajorCompaction(); |
| region.compact(true); |
| waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max |
| assertEquals(1, store.getStorefilesCount()); |
| // Even after compaction is done we will have some blocks that cannot |
| // be evicted this is because the scan is still referencing them |
| iterator = cache.iterator(); |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 as they are not yet cleared |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); |
| // Should not throw exception |
| compactionLatch.countDown(); |
| latch.countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| // by this time all blocks should have been evicted |
| iterator = cache.iterator(); |
| // Since a flush and compaction happened after a scan started |
| // we need to ensure that all the original blocks of the compacted file |
| // is also removed. |
| iterateBlockCache(cache, iterator); |
| Result r = table.get(new Get(ROW)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); |
| // The gets would be working on new blocks |
| iterator = cache.iterator(); |
| iterateBlockCache(cache, iterator); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| |
| @Test |
| public void testScanWithException() throws IOException, InterruptedException { |
| Table table = null; |
| try { |
| latch = new CountDownLatch(1); |
| exceptionLatch = new CountDownLatch(1); |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| // Create KV that will give you two blocks |
| // Create a table with block size as 1024 |
| table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, |
| CustomInnerRegionObserverWrapper.class.getName()); |
| // get the block cache and region |
| RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); |
| String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); |
| HRegion region = |
| TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); |
| HStore store = region.getStores().iterator().next(); |
| CacheConfig cacheConf = store.getCacheConfig(); |
| cacheConf.setCacheDataOnWrite(true); |
| cacheConf.setEvictOnClose(true); |
| BlockCache cache = cacheConf.getBlockCache().get(); |
| // insert data. 2 Rows are added |
| insertData(table); |
| // flush the data |
| System.out.println("Flushing cache"); |
| // Should create one Hfile with 2 blocks |
| region.flush(true); |
| // CustomInnerRegionObserver.sleepTime.set(5000); |
| CustomInnerRegionObserver.throwException.set(true); |
| ScanThread[] scanThreads = initiateScan(table, false); |
| // The block would have been decremented for the scan case as it was |
| // wrapped |
| // before even the postNext hook gets executed. |
| // giving some time for the block to be decremented |
| Thread.sleep(100); |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| boolean usedBlocksFound = false; |
| int refCount = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| assertTrue(usedBlocksFound); |
| exceptionLatch.countDown(); |
| // countdown the latch |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| for (ScanThread thread : scanThreads) { |
| thread.join(); |
| } |
| iterator = cache.iterator(); |
| usedBlocksFound = false; |
| refCount = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| if (refCount != 0) { |
| // Blocks will be with count 3 |
| assertEquals(NO_OF_THREADS, refCount); |
| usedBlocksFound = true; |
| } |
| } |
| assertFalse(usedBlocksFound); |
| // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner |
| assertEquals(0, refCount); |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| } |
| |
| private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { |
| int refCount; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| LOG.info("BucketCache {} {}", cacheKey, refCount); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| LOG.info("CombinedBlockCache {} {}", cacheKey, refCount); |
| } else { |
| continue; |
| } |
| assertEquals(0, refCount); |
| } |
| } |
| |
| private void insertData(Table table) throws IOException { |
| Put put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| put = new Put(ROW1); |
| put.addColumn(FAMILY, QUALIFIER, data); |
| table.put(put); |
| byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); |
| put = new Put(ROW); |
| put.addColumn(FAMILY, QUALIFIER2, data2); |
| table.put(put); |
| } |
| |
| private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException, |
| InterruptedException { |
| ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; |
| for (int i = 0; i < NO_OF_THREADS; i++) { |
| scanThreads[i] = new ScanThread(table, reverse); |
| } |
| for (ScanThread thread : scanThreads) { |
| thread.start(); |
| } |
| return scanThreads; |
| } |
| |
| private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) |
| throws IOException, InterruptedException { |
| GetThread[] getThreads = new GetThread[NO_OF_THREADS]; |
| for (int i = 0; i < NO_OF_THREADS; i++) { |
| getThreads[i] = new GetThread(table, tracker, multipleCFs); |
| } |
| for (GetThread thread : getThreads) { |
| thread.start(); |
| } |
| return getThreads; |
| } |
| |
| private MultiGetThread[] initiateMultiGet(Table table) |
| throws IOException, InterruptedException { |
| MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; |
| for (int i = 0; i < NO_OF_THREADS; i++) { |
| multiGetThreads[i] = new MultiGetThread(table); |
| } |
| for (MultiGetThread thread : multiGetThreads) { |
| thread.start(); |
| } |
| return multiGetThreads; |
| } |
| |
| private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) |
| throws InterruptedException { |
| int counter = NO_OF_THREADS; |
| if (CustomInnerRegionObserver.waitForGets.get()) { |
| // Because only one row is selected, it has only 2 blocks |
| counter = counter - 1; |
| while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { |
| Thread.sleep(100); |
| } |
| } else { |
| while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { |
| Thread.sleep(100); |
| } |
| } |
| Iterator<CachedBlock> iterator = cache.iterator(); |
| int refCount = 0; |
| while (iterator.hasNext()) { |
| CachedBlock next = iterator.next(); |
| BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); |
| if (cache instanceof BucketCache) { |
| refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); |
| } else if (cache instanceof CombinedBlockCache) { |
| refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); |
| } else { |
| continue; |
| } |
| System.out.println(" the refcount is " + refCount + " block is " + cacheKey); |
| if (CustomInnerRegionObserver.waitForGets.get()) { |
| if (expectOnlyZero) { |
| assertTrue(refCount == 0); |
| } |
| if (refCount != 0) { |
| // Because the scan would have also touched up on these blocks but |
| // it |
| // would have touched |
| // all 3 |
| if (getClosed) { |
| // If get has closed only the scan's blocks would be available |
| assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); |
| } else { |
| assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); |
| } |
| } |
| } else { |
| // Because the get would have also touched up on these blocks but it |
| // would have touched |
| // upon only 2 additionally |
| if (expectOnlyZero) { |
| assertTrue(refCount == 0); |
| } |
| if (refCount != 0) { |
| if (getLatch == null) { |
| assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); |
| } else { |
| assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); |
| } |
| } |
| } |
| } |
| CustomInnerRegionObserver.getCdl().get().countDown(); |
| } |
| |
| private static class MultiGetThread extends Thread { |
| private final Table table; |
| private final List<Get> gets = new ArrayList<>(); |
| public MultiGetThread(Table table) { |
| this.table = table; |
| } |
| @Override |
| public void run() { |
| gets.add(new Get(ROW)); |
| gets.add(new Get(ROW1)); |
| try { |
| CustomInnerRegionObserver.getCdl().set(latch); |
| Result[] r = table.get(gets); |
| assertTrue(Bytes.equals(r[0].getRow(), ROW)); |
| assertTrue(Bytes.equals(r[1].getRow(), ROW1)); |
| } catch (IOException e) { |
| } |
| } |
| } |
| |
| private static class GetThread extends Thread { |
| private final Table table; |
| private final boolean tracker; |
| private final boolean multipleCFs; |
| |
| public GetThread(Table table, boolean tracker, boolean multipleCFs) { |
| this.table = table; |
| this.tracker = tracker; |
| this.multipleCFs = multipleCFs; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| initiateGet(table); |
| } catch (IOException e) { |
| // do nothing |
| } |
| } |
| |
| private void initiateGet(Table table) throws IOException { |
| Get get = new Get(ROW); |
| if (tracker) { |
| // Change this |
| if (!multipleCFs) { |
| get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); |
| get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); |
| get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); |
| // Unknown key |
| get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); |
| } else { |
| get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); |
| get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); |
| get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); |
| // Unknown key |
| get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); |
| } |
| } |
| CustomInnerRegionObserver.getCdl().set(latch); |
| Result r = table.get(get); |
| System.out.println(r); |
| if (!tracker) { |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); |
| } else { |
| if (!multipleCFs) { |
| assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); |
| assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); |
| } else { |
| assertTrue(Bytes.equals( |
| r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), |
| data2)); |
| assertTrue(Bytes.equals( |
| r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), |
| data2)); |
| assertTrue(Bytes.equals( |
| r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), |
| data2)); |
| } |
| } |
| } |
| } |
| |
| private static class ScanThread extends Thread { |
| private final Table table; |
| private final boolean reverse; |
| |
| public ScanThread(Table table, boolean reverse) { |
| this.table = table; |
| this.reverse = reverse; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| initiateScan(table); |
| } catch (IOException e) { |
| // do nothing |
| } |
| } |
| |
| private void initiateScan(Table table) throws IOException { |
| Scan scan = new Scan(); |
| if (reverse) { |
| scan.setReversed(true); |
| } |
| CustomInnerRegionObserver.getCdl().set(latch); |
| ResultScanner resScanner = table.getScanner(scan); |
| int i = (reverse ? ROWS.length - 1 : 0); |
| boolean resultFound = false; |
| for (Result result : resScanner) { |
| resultFound = true; |
| System.out.println(result); |
| if (!reverse) { |
| assertTrue(Bytes.equals(result.getRow(), ROWS[i])); |
| i++; |
| } else { |
| assertTrue(Bytes.equals(result.getRow(), ROWS[i])); |
| i--; |
| } |
| } |
| assertTrue(resultFound); |
| } |
| } |
| |
| private void waitForStoreFileCount(HStore store, int count, int timeout) |
| throws InterruptedException { |
| long start = System.currentTimeMillis(); |
| while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { |
| Thread.sleep(100); |
| } |
| System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + |
| store.getStorefilesCount()); |
| assertEquals(count, store.getStorefilesCount()); |
| } |
| |
| private static class CustomScanner implements RegionScanner { |
| |
| private RegionScanner delegate; |
| |
| public CustomScanner(RegionScanner delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public boolean next(List<Cell> results) throws IOException { |
| return delegate.next(results); |
| } |
| |
| @Override |
| public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { |
| return delegate.next(result, scannerContext); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result) throws IOException { |
| return delegate.nextRaw(result); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { |
| boolean nextRaw = delegate.nextRaw(result, context); |
| if (compactionLatch != null && compactionLatch.getCount() > 0) { |
| try { |
| compactionLatch.await(); |
| } catch (InterruptedException ie) { |
| } |
| } |
| |
| if (CustomInnerRegionObserver.throwException.get()) { |
| if (exceptionLatch.getCount() > 0) { |
| try { |
| exceptionLatch.await(); |
| } catch (InterruptedException e) { |
| } |
| throw new IOException("throw exception"); |
| } |
| } |
| return nextRaw; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| delegate.close(); |
| } |
| |
| @Override |
| public RegionInfo getRegionInfo() { |
| return delegate.getRegionInfo(); |
| } |
| |
| @Override |
| public boolean isFilterDone() throws IOException { |
| return delegate.isFilterDone(); |
| } |
| |
| @Override |
| public boolean reseek(byte[] row) throws IOException { |
| return false; |
| } |
| |
| @Override |
| public long getMaxResultSize() { |
| return delegate.getMaxResultSize(); |
| } |
| |
| @Override |
| public long getMvccReadPoint() { |
| return delegate.getMvccReadPoint(); |
| } |
| |
| @Override |
| public int getBatch() { |
| return delegate.getBatch(); |
| } |
| } |
| |
| public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { |
| @Override |
| public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, |
| Scan scan, RegionScanner s) throws IOException { |
| return new CustomScanner(s); |
| } |
| } |
| |
| public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver { |
| static final AtomicInteger countOfNext = new AtomicInteger(0); |
| static final AtomicInteger countOfGets = new AtomicInteger(0); |
| static final AtomicBoolean waitForGets = new AtomicBoolean(false); |
| static final AtomicBoolean throwException = new AtomicBoolean(false); |
| private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>( |
| new CountDownLatch(0)); |
| |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| @Override |
| public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, |
| InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { |
| slowdownCode(e, false); |
| if (getLatch != null && getLatch.getCount() > 0) { |
| try { |
| getLatch.await(); |
| } catch (InterruptedException e1) { |
| } |
| } |
| return hasMore; |
| } |
| |
| @Override |
| public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, |
| List<Cell> results) throws IOException { |
| slowdownCode(e, true); |
| } |
| |
| public static AtomicReference<CountDownLatch> getCdl() { |
| return cdl; |
| } |
| |
| private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, |
| boolean isGet) { |
| CountDownLatch latch = getCdl().get(); |
| try { |
| System.out.println(latch.getCount() + " is the count " + isGet); |
| if (latch.getCount() > 0) { |
| if (isGet) { |
| countOfGets.incrementAndGet(); |
| } else { |
| countOfNext.incrementAndGet(); |
| } |
| LOG.info("Waiting for the counterCountDownLatch"); |
| latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. |
| if (latch.getCount() > 0) { |
| throw new RuntimeException("Can't wait more"); |
| } |
| } |
| } catch (InterruptedException e1) { |
| LOG.error(e1.toString(), e1); |
| } |
| } |
| } |
| } |