blob: ab68e405ca7c49a50f847594195a0dfa7faf4f74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.Iterables;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.RandomAccessReader;
import static org.junit.Assert.*;
public class BufferPoolTest
{
private BufferPool bufferPool;
@Before
public void setUp()
{
bufferPool = new BufferPool("test_pool", 8 * 1024 * 1024, true);
}
@Test
public void testGetPut()
{
final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assertEquals(true, buffer.isDirect());
assertEquals(size, bufferPool.usedSizeInBytes());
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
bufferPool.put(buffer);
assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
assertEquals(0, bufferPool.usedSizeInBytes());
}
@Test
public void testTryGet()
{
final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
ByteBuffer buffer = bufferPool.tryGet(size);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assertEquals(true, buffer.isDirect());
assertEquals(size, bufferPool.usedSizeInBytes());
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
bufferPool.put(buffer);
assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
assertEquals(0, bufferPool.usedSizeInBytes());
}
@Test
public void testPageAligned()
{
final int size = 1024;
for (int i = size;
i <= BufferPool.NORMAL_CHUNK_SIZE;
i += size)
{
checkPageAligned(i);
}
}
private void checkPageAligned(int size)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assertTrue(buffer.isDirect());
long address = MemoryUtil.getAddress(buffer);
assertTrue((address % MemoryUtil.pageSize()) == 0);
bufferPool.put(buffer);
}
@Test
public void testDifferentSizes() throws InterruptedException
{
final int size1 = 1024;
final int size2 = 2048;
ByteBuffer buffer1 = bufferPool.get(size1, BufferType.OFF_HEAP);
assertNotNull(buffer1);
assertEquals(size1, buffer1.capacity());
assertEquals(size1, bufferPool.usedSizeInBytes());
ByteBuffer buffer2 = bufferPool.get(size2, BufferType.OFF_HEAP);
assertNotNull(buffer2);
assertEquals(size2, buffer2.capacity());
assertEquals(size1 + size2, bufferPool.usedSizeInBytes());
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
bufferPool.put(buffer1);
bufferPool.put(buffer2);
assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
assertEquals(0, bufferPool.usedSizeInBytes());
}
@Test
public void testMaxMemoryExceededDirect()
{
requestDoubleMaxMemory();
}
@Test
public void testMaxMemoryExceededHeap()
{
requestDoubleMaxMemory();
}
@Test
public void testMaxMemoryExceeded_SameAsChunkSize()
{
requestDoubleMaxMemory();
}
@Test
public void testMaxMemoryExceeded_SmallerThanChunkSize()
{
bufferPool = new BufferPool("test_pool", BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2, false);
requestDoubleMaxMemory();
}
@Test
public void testRecycle()
{
requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, 3 * BufferPool.NORMAL_CHUNK_SIZE);
}
private void requestDoubleMaxMemory()
{
requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * bufferPool.memoryUsageThreshold()));
}
private void requestUpToSize(int bufferSize, int totalSize)
{
final int numBuffers = totalSize / bufferSize;
List<ByteBuffer> buffers = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++)
{
ByteBuffer buffer = bufferPool.get(bufferSize, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(bufferSize, buffer.capacity());
assertTrue(buffer.isDirect());
buffers.add(buffer);
}
for (ByteBuffer buffer : buffers)
bufferPool.put(buffer);
}
@Test
public void testBigRequest()
{
final int size = BufferPool.NORMAL_CHUNK_SIZE + 1;
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
bufferPool.put(buffer);
}
@Test
public void testFillUpChunks()
{
final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
final int numBuffers = BufferPool.NORMAL_CHUNK_SIZE / size;
List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers);
List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++)
buffers1.add(bufferPool.get(size, BufferType.OFF_HEAP));
BufferPool.Chunk chunk1 = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk1);
for (int i = 0; i < numBuffers; i++)
buffers2.add(bufferPool.get(size, BufferType.OFF_HEAP));
assertEquals(2, bufferPool.unsafeNumChunks());
for (ByteBuffer buffer : buffers1)
bufferPool.put(buffer);
assertEquals(1, bufferPool.unsafeNumChunks());
for (ByteBuffer buffer : buffers2)
bufferPool.put(buffer);
assertEquals(0, bufferPool.unsafeNumChunks());
buffers2.clear();
}
@Test
public void testOutOfOrderFrees()
{
final int size = 4096;
final int maxFreeSlots = BufferPool.NORMAL_CHUNK_SIZE / size;
final int[] idxs = new int[maxFreeSlots];
for (int i = 0; i < maxFreeSlots; i++)
idxs[i] = i;
doTestFrees(size, maxFreeSlots, idxs);
}
@Test
public void testInOrderFrees()
{
final int size = 4096;
final int maxFreeSlots = BufferPool.NORMAL_CHUNK_SIZE / size;
final int[] idxs = new int[maxFreeSlots];
for (int i = 0; i < maxFreeSlots; i++)
idxs[i] = maxFreeSlots - 1 - i;
doTestFrees(size, maxFreeSlots, idxs);
}
@Test
public void testRandomFrees()
{
doTestRandomFrees(12345567878L);
bufferPool.unsafeReset();
doTestRandomFrees(20452249587L);
bufferPool.unsafeReset();
doTestRandomFrees(82457252948L);
bufferPool.unsafeReset();
doTestRandomFrees(98759284579L);
bufferPool.unsafeReset();
doTestRandomFrees(19475257244L);
}
private void doTestRandomFrees(long seed)
{
final int size = 4096;
final int maxFreeSlots = BufferPool.NORMAL_CHUNK_SIZE / size;
final int[] idxs = new int[maxFreeSlots];
for (int i = 0; i < maxFreeSlots; i++)
idxs[i] = maxFreeSlots - 1 - i;
Random rnd = new Random();
rnd.setSeed(seed);
for (int i = idxs.length - 1; i > 0; i--)
{
int idx = rnd.nextInt(i+1);
int v = idxs[idx];
idxs[idx] = idxs[i];
idxs[i] = v;
}
doTestFrees(size, maxFreeSlots, idxs);
}
private void doTestFrees(final int size, final int maxFreeSlots, final int[] toReleaseIdxs)
{
List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots);
for (int i = 0; i < maxFreeSlots; i++)
{
buffers.add(bufferPool.get(size, BufferType.OFF_HEAP));
}
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertFalse(chunk.isFree());
int freeSize = BufferPool.NORMAL_CHUNK_SIZE - maxFreeSlots * size;
assertEquals(freeSize, chunk.free());
for (int i : toReleaseIdxs)
{
ByteBuffer buffer = buffers.get(i);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
bufferPool.put(buffer);
freeSize += size;
if (freeSize == chunk.capacity())
assertEquals(0, chunk.free());
else
assertEquals(freeSize, chunk.free());
}
assertFalse(chunk.isFree());
}
@Test
public void testDifferentSizeBuffersOnOneChunk()
{
int[] sizes = new int[] {
5, 1024, 4096, 8, 16000, 78, 512, 256, 63, 55, 89, 90, 255, 32, 2048, 128
};
int sum = 0;
List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
for (int i = 0; i < sizes.length; i++)
{
ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertTrue(buffer.capacity() >= sizes[i]);
buffers.add(buffer);
sum += bufferPool.unsafeCurrentChunk().roundUp(buffer.capacity());
}
// else the test will fail, adjust sizes as required
assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
Random rnd = new Random();
rnd.setSeed(298347529L);
while (buffers.size() > 1)
{
int index = rnd.nextInt(buffers.size());
ByteBuffer buffer = buffers.remove(index);
bufferPool.put(buffer);
}
bufferPool.put(buffers.remove(0));
assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(0, chunk.free());
}
@Test
public void testChunkExhausted()
{
final int size = BufferPool.NORMAL_CHUNK_SIZE / 64; // 1kibibit
int[] sizes = new int[128];
Arrays.fill(sizes, size);
int sum = 0;
List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
for (int i = 0; i < sizes.length; i++)
{
ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertTrue(buffer.capacity() >= sizes[i]);
buffers.add(buffer);
sum += buffer.capacity();
}
// else the test will fail, adjust sizes as required
assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
for (int i = 0; i < sizes.length; i++)
{
bufferPool.put(buffers.get(i));
}
assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(0, chunk.free());
}
@Test
public void testCompactIfOutOfCapacity()
{
final int size = 4096;
final int numBuffersInChunk = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / size;
List<ByteBuffer> buffers = new ArrayList<>(numBuffersInChunk);
Set<Long> addresses = new HashSet<>(numBuffersInChunk);
for (int i = 0; i < numBuffersInChunk; i++)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
buffers.add(buffer);
addresses.add(MemoryUtil.getAddress(buffer));
}
for (int i = numBuffersInChunk - 1; i >= 0; i--)
bufferPool.put(buffers.get(i));
buffers.clear();
for (int i = 0; i < numBuffersInChunk; i++)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assert addresses.remove(MemoryUtil.getAddress(buffer));
buffers.add(buffer);
}
assertTrue(addresses.isEmpty()); // all 5 released buffers were used
for (ByteBuffer buffer : buffers)
bufferPool.put(buffer);
}
@Test
public void testHeapBuffer()
{
ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP);
assertNotNull(buffer);
assertEquals(1024, buffer.capacity());
assertFalse(buffer.isDirect());
assertNotNull(buffer.array());
bufferPool.put(buffer);
}
@Test
public void testSingleBufferOneChunk()
{
checkBuffer(0);
checkBuffer(1);
checkBuffer(2);
checkBuffer(4);
checkBuffer(5);
checkBuffer(8);
checkBuffer(16);
checkBuffer(32);
checkBuffer(64);
checkBuffer(65);
checkBuffer(127);
checkBuffer(128);
checkBuffer(129);
checkBuffer(255);
checkBuffer(256);
checkBuffer(512);
checkBuffer(1024);
checkBuffer(2048);
checkBuffer(4096);
checkBuffer(8192);
checkBuffer(16384);
checkBuffer(16385);
checkBuffer(32767);
checkBuffer(32768);
checkBuffer(32769);
checkBuffer(33172);
checkBuffer(33553);
checkBuffer(36000);
checkBuffer(65535);
checkBuffer(65536);
checkBuffer(65537);
}
private void checkBuffer(int size)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE)
{
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size));
}
bufferPool.put(buffer);
}
@Test
public void testMultipleBuffersOneChunk()
{
checkBuffers(32768, 33553);
checkBuffers(32768, 32768);
checkBuffers(48450, 33172);
checkBuffers(32768, 15682, 33172);
}
private void checkBuffers(int ... sizes)
{
List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
for (int size : sizes)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
buffers.add(buffer);
}
for (ByteBuffer buffer : buffers)
bufferPool.put(buffer);
}
@Test
public void testBuffersWithGivenSlots()
{
checkBufferWithGivenSlots(21241, (-1L << 27) ^ (1L << 40));
}
private void checkBufferWithGivenSlots(int size, long freeSlots)
{
//first allocate to make sure there is a chunk
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
// now get the current chunk and override the free slots mask
BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
long oldFreeSlots = chunk.setFreeSlots(freeSlots);
// now check we can still get the buffer with the free slots mask changed
ByteBuffer buffer2 = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
bufferPool.put(buffer2);
// unsafeReset the free slots
chunk.setFreeSlots(oldFreeSlots);
bufferPool.put(buffer);
}
@Test
public void testZeroSizeRequest()
{
ByteBuffer buffer = bufferPool.get(0, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(0, buffer.capacity());
bufferPool.put(buffer);
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeSizeRequest()
{
bufferPool.get(-1, BufferType.OFF_HEAP);
}
@Test
public void testMT_SameSizeImmediateReturn() throws InterruptedException
{
checkMultipleThreads(40, 1, true, RandomAccessReader.DEFAULT_BUFFER_SIZE);
}
@Test
public void testMT_SameSizePostponedReturn() throws InterruptedException
{
checkMultipleThreads(40, 1, false, RandomAccessReader.DEFAULT_BUFFER_SIZE);
}
@Test
public void testMT_TwoSizesOneBufferImmediateReturn() throws InterruptedException
{
checkMultipleThreads(40, 1, true, 1024, 2048);
}
@Test
public void testMT_TwoSizesOneBufferPostponedReturn() throws InterruptedException
{
checkMultipleThreads(40, 1, false, 1024, 2048);
}
@Test
public void testMT_TwoSizesTwoBuffersImmediateReturn() throws InterruptedException
{
checkMultipleThreads(40, 2, true, 1024, 2048);
}
@Test
public void testMT_TwoSizesTwoBuffersPostponedReturn() throws InterruptedException
{
checkMultipleThreads(40, 2, false, 1024, 2048);
}
@Test
public void testMT_MultipleSizesOneBufferImmediateReturn() throws InterruptedException
{
checkMultipleThreads(40,
1,
true,
1024,
2048,
3072,
4096,
5120);
}
@Test
public void testMT_MultipleSizesOneBufferPostponedReturn() throws InterruptedException
{
checkMultipleThreads(40,
1,
false,
1024,
2048,
3072,
4096,
5120);
}
@Test
public void testMT_MultipleSizesMultipleBuffersImmediateReturn() throws InterruptedException
{
checkMultipleThreads(40,
4,
true,
1024,
2048,
3072,
4096,
5120);
}
@Test
public void testMT_MultipleSizesMultipleBuffersPostponedReturn() throws InterruptedException
{
checkMultipleThreads(40,
3,
false,
1024,
2048,
3072,
4096,
5120);
}
private void checkMultipleThreads(int threadCount, int numBuffersPerThread, final boolean returnImmediately, final int ... sizes) throws InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
final CountDownLatch finished = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++)
{
final int[] threadSizes = new int[numBuffersPerThread];
for (int j = 0; j < threadSizes.length; j++)
threadSizes[j] = sizes[(i * numBuffersPerThread + j) % sizes.length];
final Random rand = new Random();
executorService.submit(new Runnable()
{
@Override
public void run()
{
try
{
Thread.sleep(rand.nextInt(3));
List<ByteBuffer> toBeReturned = new ArrayList<ByteBuffer>(threadSizes.length);
for (int j = 0; j < threadSizes.length; j++)
{
ByteBuffer buffer = bufferPool.get(threadSizes[j], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(threadSizes[j], buffer.capacity());
for (int i = 0; i < 10; i++)
buffer.putInt(i);
buffer.rewind();
Thread.sleep(rand.nextInt(3));
for (int i = 0; i < 10; i++)
assertEquals(i, buffer.getInt());
if (returnImmediately)
bufferPool.put(buffer);
else
toBeReturned.add(buffer);
assertTrue(bufferPool.sizeInBytes() > 0);
}
Thread.sleep(rand.nextInt(3));
for (ByteBuffer buffer : toBeReturned)
bufferPool.put(buffer);
}
catch (Exception ex)
{
ex.printStackTrace();
fail(ex.getMessage());
}
finally
{
finished.countDown();
}
}
});
}
finished.await();
assertEquals(0, executorService.shutdownNow().size());
// Make sure thread local storage gets GC-ed
for (int i = 0; i < 5; i++)
{
System.gc();
Thread.sleep(100);
}
}
@Ignore
public void testMultipleThreadsReleaseSameBuffer() throws InterruptedException
{
doMultipleThreadsReleaseBuffers(45, 4096);
}
@Ignore
public void testMultipleThreadsReleaseDifferentBuffer() throws InterruptedException
{
doMultipleThreadsReleaseBuffers(45, 4096, 8192);
}
private void doMultipleThreadsReleaseBuffers(final int threadCount, final int ... sizes) throws InterruptedException
{
final ByteBuffer[] buffers = new ByteBuffer[sizes.length];
int sum = 0;
for (int i = 0; i < sizes.length; i++)
{
buffers[i] = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffers[i]);
assertEquals(sizes[i], buffers[i].capacity());
sum += bufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity());
}
final BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertFalse(chunk.isFree());
// if we use multiple chunks the test will fail, adjust sizes accordingly
assertTrue(sum < BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
final CountDownLatch finished = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++)
{
final int idx = i % sizes.length;
final ByteBuffer buffer = buffers[idx];
executorService.submit(new Runnable()
{
@Override
public void run()
{
try
{
assertNotSame(chunk, bufferPool.unsafeCurrentChunk());
bufferPool.put(buffer);
}
catch (AssertionError ex)
{ //this is expected if we release a buffer more than once
ex.printStackTrace();
}
catch (Throwable t)
{
t.printStackTrace();
fail(t.getMessage());
}
finally
{
finished.countDown();
}
}
});
}
finished.await();
assertEquals(0, executorService.shutdownNow().size());
executorService = null;
// Make sure thread local storage gets GC-ed
System.gc();
System.gc();
System.gc();
assertTrue(bufferPool.unsafeCurrentChunk().isFree());
//make sure the main thread can still allocate buffers
ByteBuffer buffer = bufferPool.get(sizes[0], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(sizes[0], buffer.capacity());
bufferPool.put(buffer);
}
@Test
public void testOverflowAllocation()
{
int macroChunkSize = BufferPool.GlobalPool.MACRO_CHUNK_SIZE;
int allocationSize = BufferPool.NORMAL_CHUNK_SIZE;
int allocations = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / allocationSize;
// occupy entire buffer pool
List<ByteBuffer> buffers = new ArrayList<>();
allocate(allocations, allocationSize, buffers);
assertEquals(macroChunkSize, bufferPool.sizeInBytes());
assertEquals(macroChunkSize, bufferPool.usedSizeInBytes());
assertEquals(0, bufferPool.overflowMemoryInBytes());
// allocate overflow due to pool exhaust
ByteBuffer overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.OFF_HEAP);
assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
// free all buffer
bufferPool.put(overflowBuffer);
release(buffers);
assertEquals(macroChunkSize, bufferPool.sizeInBytes());
assertEquals(0, bufferPool.usedSizeInBytes());
assertEquals(0, bufferPool.overflowMemoryInBytes());
// allocate overflow due to on-heap
overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.ON_HEAP);
assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
bufferPool.put(overflowBuffer);
// allocate overflow due to over allocation size
overflowBuffer = bufferPool.get(2 * BufferPool.NORMAL_CHUNK_SIZE, BufferType.ON_HEAP);
assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
}
@Test
public void testRecyclePartialFreeChunk()
{
// normal chunk size is 128KiB
int halfNormalChunk = BufferPool.NORMAL_CHUNK_SIZE / 2; // 64KiB, half of normal chunk
List<ByteBuffer> toRelease = new ArrayList<>();
// allocate three buffers on different chunks
ByteBuffer buffer0 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
BufferPool.Chunk chunk0 = BufferPool.Chunk.getParentChunk(buffer0);
assertFalse(chunk0.isFree());
allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
ByteBuffer buffer1 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
BufferPool.Chunk chunk1 = BufferPool.Chunk.getParentChunk(buffer1);
assertFalse(chunk1.isFree());
assertNotEquals(chunk0, chunk1);
allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
ByteBuffer buffer2 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
BufferPool.Chunk chunk2 = BufferPool.Chunk.getParentChunk(buffer2);
assertFalse(chunk2.isFree());
assertNotEquals(chunk0, chunk2);
assertNotEquals(chunk1, chunk2);
allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
// now all 3 chunks in local pool is full, allocate one more buffer to evict chunk2
ByteBuffer buffer3 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
BufferPool.Chunk chunk3 = BufferPool.Chunk.getParentChunk(buffer3);
assertNotEquals(chunk0, chunk3);
assertNotEquals(chunk1, chunk3);
assertNotEquals(chunk2, chunk3);
// verify chunk2 got evicted, it doesn't have a owner
assertNotNull(chunk0.owner());
assertEquals(BufferPool.Chunk.Status.IN_USE, chunk0.status());
assertNotNull(chunk1.owner());
assertEquals(BufferPool.Chunk.Status.IN_USE, chunk1.status());
assertNull(chunk2.owner());
assertEquals(BufferPool.Chunk.Status.EVICTED, chunk2.status());
// release half buffers for chunk0/1/2
release(toRelease);
BufferPool.Chunk partiallyFreed = chunk2;
// try to recirculate chunk2 and verify freed space
assertFalse(bufferPool.globalPool().isFullyFreed(partiallyFreed));
assertTrue(bufferPool.globalPool().isPartiallyFreed(partiallyFreed));
assertEquals(BufferPool.Chunk.Status.IN_USE, partiallyFreed.status());
assertEquals(halfNormalChunk, partiallyFreed.free());
ByteBuffer buffer = partiallyFreed.get(halfNormalChunk, false, null);
assertEquals(halfNormalChunk, buffer.capacity());
// cleanup allocated buffers
for (ByteBuffer buf : Arrays.asList(buffer0, buffer1, buffer2, buffer3, buffer))
bufferPool.put(buf);
// verify that fully freed chunk are prioritized over partially freed chunks
List<BufferPool.Chunk> remainingChunks = new ArrayList<>();
BufferPool.Chunk chunkForAllocation;
while ((chunkForAllocation = bufferPool.globalPool().get()) != null)
remainingChunks.add(chunkForAllocation);
int totalNormalChunks = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / BufferPool.NORMAL_CHUNK_SIZE; // 64;
assertEquals(totalNormalChunks, remainingChunks.size());
assertSame(partiallyFreed, remainingChunks.get(remainingChunks.size() - 1)); // last one is partially freed
// cleanup polled chunks
remainingChunks.forEach(BufferPool.Chunk::release);
}
@Test
public void testTinyPool()
{
int total = 0;
final int size = BufferPool.TINY_ALLOCATION_UNIT;
final int allocationPerChunk = 64;
// occupy 3 tiny chunks
List<ByteBuffer> buffers0 = new ArrayList<>();
BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0);
assertTrue(chunk0.owner().isTinyPool());
List<ByteBuffer> buffers1 = new ArrayList<>();
BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1);
assertTrue(chunk1.owner().isTinyPool());
List<ByteBuffer> buffers2 = new ArrayList<>();
BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2);
assertTrue(chunk2.owner().isTinyPool());
total += 3 * BufferPool.TINY_CHUNK_SIZE;
assertEquals(total, bufferPool.usedSizeInBytes());
// allocate another tiny chunk.. chunk2 should be evicted
List<ByteBuffer> buffers3 = new ArrayList<>();
BufferPool.Chunk chunk3 = allocate(allocationPerChunk, size, buffers3);
assertTrue(chunk3.owner().isTinyPool());
total += BufferPool.TINY_CHUNK_SIZE;
assertEquals(total, bufferPool.usedSizeInBytes());
// verify chunk2 is full and evicted
assertEquals(0, chunk2.free());
assertNull(chunk2.owner());
// release chunk2's buffer
for (int i = 0; i < buffers2.size(); i++)
{
bufferPool.put(buffers2.get(i));
total -= buffers2.get(i).capacity();
assertEquals(total, bufferPool.usedSizeInBytes());
}
// cleanup allocated buffers
for (ByteBuffer buffer : Iterables.concat(buffers0, buffers1, buffers3))
bufferPool.put(buffer);
}
@Test
public void testReleaseLocal()
{
final int size = BufferPool.TINY_ALLOCATION_UNIT;
final int allocationPerChunk = 64;
// occupy 3 tiny chunks
List<ByteBuffer> buffers0 = new ArrayList<>();
BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0);
List<ByteBuffer> buffers1 = new ArrayList<>();
BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1);
List<ByteBuffer> buffers2 = new ArrayList<>();
BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2);
// release them from the pool
bufferPool.releaseLocal();
assertNull(chunk0.owner());
assertNull(chunk1.owner());
assertNull(chunk2.owner());
assertEquals(BufferPool.Chunk.Status.EVICTED, chunk0.status());
assertEquals(BufferPool.Chunk.Status.EVICTED, chunk1.status());
assertEquals(BufferPool.Chunk.Status.EVICTED, chunk2.status());
// cleanup allocated buffers, that should still work fine even though we released them from the localPool
for (ByteBuffer buffer : Iterables.concat(buffers0, buffers1, buffers2))
bufferPool.put(buffer);
assertEquals(0, bufferPool.usedSizeInBytes());
}
@Test
public void testPuttingUnusedPortion()
{
final int expectedCapacity = BufferPool.TINY_ALLOCATION_UNIT * 4;
final int quarterUnit = BufferPool.TINY_ALLOCATION_UNIT / 4;
final int requestedCapacity = expectedCapacity - 3 * quarterUnit;
ByteBuffer buffer = bufferPool.getAtLeast(requestedCapacity, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(expectedCapacity, buffer.capacity());
assertEquals(expectedCapacity, bufferPool.usedSizeInBytes());
buffer.limit(requestedCapacity); // 3.25 x unit
bufferPool.putUnusedPortion(buffer);
// the unused portion was too small to be returned, the buffer remains unchanged
assertEquals(expectedCapacity, buffer.capacity());
// used size is didn't change either
assertEquals(expectedCapacity, bufferPool.usedSizeInBytes());
buffer.limit(expectedCapacity - BufferPool.TINY_ALLOCATION_UNIT); // 3.0 x unit
bufferPool.putUnusedPortion(buffer);
// now we should notice a change
assertEquals(BufferPool.TINY_ALLOCATION_UNIT * 3, buffer.capacity());
assertEquals(BufferPool.TINY_ALLOCATION_UNIT * 3, bufferPool.usedSizeInBytes());
bufferPool.put(buffer);
assertEquals(0, bufferPool.usedSizeInBytes());
}
private BufferPool.Chunk allocate(int num, int bufferSize, List<ByteBuffer> buffers)
{
for (int i = 0; i < num; i++)
buffers.add(bufferPool.get(bufferSize, BufferType.OFF_HEAP));
return BufferPool.Chunk.getParentChunk(buffers.get(buffers.size() - 1));
}
private void release(List<ByteBuffer> toRelease)
{
for (ByteBuffer buffer : toRelease)
bufferPool.put(buffer);
}
}