blob: e0ff50d7b21ea5297086dbbd4821db9862f7e590 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
@Category({RegionServerTests.class, MediumTests.class})
public class TestMemStoreLAB {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMemStoreLAB.class);
private final static Configuration conf = new Configuration();
private static final byte[] rk = Bytes.toBytes("r1");
private static final byte[] cf = Bytes.toBytes("f");
private static final byte[] q = Bytes.toBytes("q");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
long globalMemStoreLimit =
(long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
* MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
}
/**
* Test a bunch of random allocations
*/
@Test
public void testLABRandomAllocation() {
Random rand = new Random();
MemStoreLAB mslab = new MemStoreLABImpl();
int expectedOff = 0;
ByteBuffer lastBuffer = null;
int lastChunkId = -1;
// 100K iterations by 0-1K alloc -> 50MB expected
// should be reasonable for unit test and also cover wraparound
// behavior
for (int i = 0; i < 100000; i++) {
int valSize = rand.nextInt(3);
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
int size = kv.getSerializedSize();
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is an int we need to account for those 4 bytes
expectedOff = Bytes.SIZEOF_INT;
lastBuffer = newKv.getBuffer();
int chunkId = newKv.getBuffer().getInt(0);
assertTrue("chunkid should be different", chunkId != lastChunkId);
lastChunkId = chunkId;
}
assertEquals(expectedOff, newKv.getOffset());
assertTrue("Allocation overruns buffer",
newKv.getOffset() + size <= newKv.getBuffer().capacity());
expectedOff += size;
}
}
@Test
public void testLABLargeAllocation() {
MemStoreLAB mslab = new MemStoreLABImpl();
KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
Cell newCell = mslab.copyCellInto(kv);
assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
}
/**
* Test allocation from lots of threads, making sure the results don't
* overlap in any way
*/
@Test
public void testLABThreading() throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
final AtomicInteger totalAllocated = new AtomicInteger();
final MemStoreLAB mslab = new MemStoreLABImpl();
List<List<AllocRecord>> allocations = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
allocations.add(allocsByThisThread);
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
private Random r = new Random();
@Override
public void doAnAction() throws Exception {
int valSize = r.nextInt(3);
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
int size = kv.getSerializedSize();
ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
totalAllocated.addAndGet(size);
allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
}
};
ctx.addThread(t);
}
ctx.startThreads();
while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
// Partition the allocations by the actual byte[] they point into,
// make sure offsets are unique for each chunk
Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk =
Maps.newHashMap();
int sizeCounted = 0;
for (AllocRecord rec : Iterables.concat(allocations)) {
sizeCounted += rec.size;
if (rec.size == 0) {
continue;
}
Map<Integer, AllocRecord> mapForThisByteArray =
mapsByChunk.get(rec.alloc);
if (mapForThisByteArray == null) {
mapForThisByteArray = Maps.newTreeMap();
mapsByChunk.put(rec.alloc, mapForThisByteArray);
}
AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
assertNull("Already had an entry " + oldVal + " for allocation " + rec,
oldVal);
}
assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
// Now check each byte array to make sure allocations don't overlap
for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
// since we add the chunkID at the 0th offset of the chunk and the
// chunkid is an int we need to account for those 4 bytes
int expectedOff = Bytes.SIZEOF_INT;
for (AllocRecord alloc : allocsInChunk.values()) {
assertEquals(expectedOff, alloc.offset);
assertTrue("Allocation overruns buffer",
alloc.offset + alloc.size <= alloc.alloc.capacity());
expectedOff += alloc.size;
}
}
}
/**
* Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
* there's no memory leak (HBASE-16195)
* @throws Exception if any error occurred
*/
@Test
public void testLABChunkQueue() throws Exception {
ChunkCreator oldInstance = null;
try {
MemStoreLABImpl mslab = new MemStoreLABImpl();
// by default setting, there should be no chunks initialized in the pool
assertTrue(mslab.getPooledChunks().isEmpty());
oldInstance = ChunkCreator.instance;
ChunkCreator.instance = null;
// reset mslab with chunk pool
Configuration conf = HBaseConfiguration.create();
conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
// set chunk size to default max alloc size, so we could easily trigger chunk retirement
conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
// reconstruct mslab
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false,
globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
ChunkCreator.clearDisableFlag();
mslab = new MemStoreLABImpl(conf);
// launch multiple threads to trigger frequent chunk retirement
List<Thread> threads = new ArrayList<>();
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
for (int i = 0; i < 10; i++) {
threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
}
for (Thread thread : threads) {
thread.start();
}
// let it run for some time
Thread.sleep(1000);
for (Thread thread : threads) {
thread.interrupt();
}
boolean threadsRunning = true;
boolean alive = false;
while (threadsRunning) {
alive = false;
for (Thread thread : threads) {
if (thread.isAlive()) {
alive = true;
break;
}
}
if (!alive) {
threadsRunning = false;
}
}
// none of the chunkIds would have been returned back
assertTrue("All the chunks must have been cleared",
ChunkCreator.instance.numberOfMappedChunks() != 0);
int pooledChunksNum = mslab.getPooledChunks().size();
// close the mslab
mslab.close();
// make sure all chunks where reclaimed back to pool
int queueLength = mslab.getNumOfChunksReturnedToPool();
assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ " after mslab closed but actually: " + (pooledChunksNum-queueLength),
pooledChunksNum-queueLength == 0);
} finally {
ChunkCreator.instance = oldInstance;
}
}
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
Cell cellToCopyInto) {
Thread thread = new Thread() {
volatile boolean stopped = false;
@Override
public void run() {
while (!stopped) {
// keep triggering chunk retirement
mslab.copyCellInto(cellToCopyInto);
}
}
@Override
public void interrupt() {
this.stopped = true;
}
};
thread.setName(threadName);
thread.setDaemon(true);
return thread;
}
private static class AllocRecord implements Comparable<AllocRecord>{
private final ByteBuffer alloc;
private final int offset;
private final int size;
public AllocRecord(ByteBuffer alloc, int offset, int size) {
super();
this.alloc = alloc;
this.offset = offset;
this.size = size;
}
@Override
public int compareTo(AllocRecord e) {
if (alloc != e.alloc) {
throw new RuntimeException("Can only compare within a particular array");
}
return Ints.compare(this.offset, e.offset);
}
@Override
public String toString() {
return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
}
}
}