| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.io.hfile; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.MultithreadedTestUtil; |
| import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; |
| import org.apache.hadoop.hbase.io.ByteBuffAllocator; |
| import org.apache.hadoop.hbase.io.HeapSize; |
| import org.apache.hadoop.hbase.io.compress.Compression; |
| import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; |
| import org.apache.hadoop.hbase.nio.ByteBuff; |
| import org.apache.hadoop.hbase.util.ChecksumType; |
| |
| public class CacheTestUtils { |
| |
| private static final boolean includesMemstoreTS = true; |
| |
| /** |
| * Just checks if heapsize grows when something is cached, and gets smaller |
| * when the same object is evicted |
| */ |
| |
| public static void testHeapSizeChanges(final BlockCache toBeTested, |
| final int blockSize) { |
| HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); |
| long heapSize = ((HeapSize) toBeTested).heapSize(); |
| toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); |
| |
| /*When we cache something HeapSize should always increase */ |
| assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); |
| |
| toBeTested.evictBlock(blocks[0].blockName); |
| |
| /*Post eviction, heapsize should be the same */ |
| assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); |
| } |
| |
| public static void testCacheMultiThreaded(final BlockCache toBeTested, |
| final int blockSize, final int numThreads, final int numQueries, |
| final double passingScore) throws Exception { |
| |
| Configuration conf = new Configuration(); |
| MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( |
| conf); |
| |
| final AtomicInteger totalQueries = new AtomicInteger(); |
| final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); |
| final AtomicInteger hits = new AtomicInteger(); |
| final AtomicInteger miss = new AtomicInteger(); |
| |
| HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); |
| blocksToTest.addAll(Arrays.asList(blocks)); |
| |
| for (int i = 0; i < numThreads; i++) { |
| TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { |
| @Override |
| public void doAnAction() throws Exception { |
| if (!blocksToTest.isEmpty()) { |
| HFileBlockPair ourBlock = blocksToTest.poll(); |
| // if we run out of blocks to test, then we should stop the tests. |
| if (ourBlock == null) { |
| ctx.setStopFlag(true); |
| return; |
| } |
| toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); |
| Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, |
| false, false, true); |
| if (retrievedBlock != null) { |
| assertEquals(ourBlock.block, retrievedBlock); |
| toBeTested.evictBlock(ourBlock.blockName); |
| hits.incrementAndGet(); |
| assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); |
| } else { |
| miss.incrementAndGet(); |
| } |
| totalQueries.incrementAndGet(); |
| } |
| } |
| }; |
| t.setDaemon(true); |
| ctx.addThread(t); |
| } |
| ctx.startThreads(); |
| while (!blocksToTest.isEmpty() && ctx.shouldRun()) { |
| Thread.sleep(10); |
| } |
| ctx.stop(); |
| if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { |
| fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " |
| + miss.get()); |
| } |
| } |
| |
| public static void testCacheSimple(BlockCache toBeTested, int blockSize, |
| int numBlocks) throws Exception { |
| |
| HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); |
| // Confirm empty |
| for (HFileBlockPair block : blocks) { |
| assertNull(toBeTested.getBlock(block.blockName, true, false, true)); |
| } |
| |
| // Add blocks |
| for (HFileBlockPair block : blocks) { |
| toBeTested.cacheBlock(block.blockName, block.block); |
| } |
| |
| // Check if all blocks are properly cached and contain the right |
| // information, or the blocks are null. |
| // MapMaker makes no guarantees when it will evict, so neither can we. |
| |
| for (HFileBlockPair block : blocks) { |
| HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); |
| if (buf != null) { |
| assertEquals(block.block, buf); |
| } |
| } |
| |
| // Re-add some duplicate blocks. Hope nothing breaks. |
| |
| for (HFileBlockPair block : blocks) { |
| try { |
| if (toBeTested.getBlock(block.blockName, true, false, true) != null) { |
| toBeTested.cacheBlock(block.blockName, block.block); |
| if (!(toBeTested instanceof BucketCache)) { |
| // BucketCache won't throw exception when caching already cached |
| // block |
| fail("Cache should not allow re-caching a block"); |
| } |
| } |
| } catch (RuntimeException re) { |
| // expected |
| } |
| } |
| |
| } |
| |
| public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) |
| throws Exception { |
| final BlockCacheKey key = new BlockCacheKey("key", 0); |
| final byte[] buf = new byte[5 * 1024]; |
| Arrays.fill(buf, (byte) 5); |
| |
| final ByteArrayCacheable bac = new ByteArrayCacheable(buf); |
| Configuration conf = new Configuration(); |
| MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); |
| |
| final AtomicInteger totalQueries = new AtomicInteger(); |
| toBeTested.cacheBlock(key, bac); |
| |
| for (int i = 0; i < numThreads; i++) { |
| TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { |
| @Override |
| public void doAnAction() throws Exception { |
| ByteArrayCacheable returned = |
| (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); |
| if (returned != null) { |
| assertArrayEquals(buf, returned.buf); |
| } else { |
| Thread.sleep(10); |
| } |
| totalQueries.incrementAndGet(); |
| } |
| }; |
| |
| t.setDaemon(true); |
| ctx.addThread(t); |
| } |
| |
| // add a thread to periodically evict and re-cache the block |
| final long blockEvictPeriod = 50; |
| TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { |
| @Override |
| public void doAnAction() throws Exception { |
| toBeTested.evictBlock(key); |
| toBeTested.cacheBlock(key, bac); |
| Thread.sleep(blockEvictPeriod); |
| } |
| }; |
| t.setDaemon(true); |
| ctx.addThread(t); |
| |
| ctx.startThreads(); |
| while (totalQueries.get() < numQueries && ctx.shouldRun()) { |
| Thread.sleep(10); |
| } |
| ctx.stop(); |
| } |
| |
| public static class ByteArrayCacheable implements Cacheable { |
| |
| static final CacheableDeserializer<Cacheable> blockDeserializer = |
| new CacheableDeserializer<Cacheable>() { |
| @Override |
| public int getDeserializerIdentifier() { |
| return deserializerIdentifier; |
| } |
| |
| @Override |
| public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { |
| int len = b.getInt(); |
| Thread.yield(); |
| byte buf[] = new byte[len]; |
| b.get(buf); |
| return new ByteArrayCacheable(buf); |
| } |
| }; |
| |
| final byte[] buf; |
| |
| public ByteArrayCacheable(byte[] buf) { |
| this.buf = buf; |
| } |
| |
| @Override |
| public long heapSize() { |
| return 4L + buf.length; |
| } |
| |
| @Override |
| public int getSerializedLength() { |
| return 4 + buf.length; |
| } |
| |
| @Override |
| public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { |
| destination.putInt(buf.length); |
| Thread.yield(); |
| destination.put(buf); |
| destination.rewind(); |
| } |
| |
| @Override |
| public CacheableDeserializer<Cacheable> getDeserializer() { |
| return blockDeserializer; |
| } |
| |
| private static final int deserializerIdentifier; |
| static { |
| deserializerIdentifier = CacheableDeserializerIdManager |
| .registerDeserializer(blockDeserializer); |
| } |
| |
| @Override |
| public BlockType getBlockType() { |
| return BlockType.DATA; |
| } |
| } |
| |
| |
| public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { |
| HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; |
| Random rand = new Random(); |
| HashSet<String> usedStrings = new HashSet<>(); |
| for (int i = 0; i < numBlocks; i++) { |
| ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); |
| rand.nextBytes(cachedBuffer.array()); |
| cachedBuffer.rewind(); |
| int onDiskSizeWithoutHeader = blockSize; |
| int uncompressedSizeWithoutHeader = blockSize; |
| long prevBlockOffset = rand.nextLong(); |
| BlockType.DATA.write(cachedBuffer); |
| cachedBuffer.putInt(onDiskSizeWithoutHeader); |
| cachedBuffer.putInt(uncompressedSizeWithoutHeader); |
| cachedBuffer.putLong(prevBlockOffset); |
| cachedBuffer.rewind(); |
| HFileContext meta = new HFileContextBuilder() |
| .withHBaseCheckSum(false) |
| .withIncludesMvcc(includesMemstoreTS) |
| .withIncludesTags(false) |
| .withCompression(Compression.Algorithm.NONE) |
| .withBytesPerCheckSum(0) |
| .withChecksumType(ChecksumType.NULL) |
| .build(); |
| HFileBlock generated = |
| new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, |
| prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, |
| onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, |
| ByteBuffAllocator.HEAP); |
| |
| String strKey; |
| /* No conflicting keys */ |
| strKey = Long.toString(rand.nextLong()); |
| while (!usedStrings.add(strKey)) { |
| strKey = Long.toString(rand.nextLong()); |
| } |
| |
| returnedBlocks[i] = new HFileBlockPair(); |
| returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0); |
| returnedBlocks[i].block = generated; |
| } |
| return returnedBlocks; |
| } |
| |
| public static class HFileBlockPair { |
| BlockCacheKey blockName; |
| HFileBlock block; |
| |
| public BlockCacheKey getBlockName() { |
| return this.blockName; |
| } |
| |
| public HFileBlock getBlock() { |
| return this.block; |
| } |
| } |
| |
| public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, |
| Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { |
| destBuffer.clear(); |
| cache.cacheBlock(key, blockToCache); |
| Cacheable actualBlock = cache.getBlock(key, false, false, false); |
| try { |
| actualBlock.serialize(destBuffer, true); |
| assertEquals(expectedBuffer, destBuffer); |
| } finally { |
| // Release the reference count increased by getBlock. |
| if (actualBlock != null) { |
| actualBlock.release(); |
| } |
| } |
| } |
| } |