blob: 117ee7c06452c64a821878dd6694070148ebf12b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.DynamicList;
import static org.junit.Assert.*;
/**
* Long BufferPool test - make sure that the BufferPool allocates and recycles
* ByteBuffers under heavy concurrent usage.
*
* The test creates two groups of threads
*
* - the burn producer/consumer pair that allocates 1/10 poolSize and then returns
* all the memory to the pool. 50% is freed by the producer, 50% passed to the consumer thread.
*
* - a ring of worker threads that allocate buffers and either immediately free them,
* or pass to the next worker thread for it to be freed on it's behalf. Periodically
* all memory is freed by the thread.
*
* While the burn/worker threads run, the original main thread checks that all of the threads are still
* making progress every 10s (no locking issues, or exits from assertion failures),
* and that every chunk has been freed at least once during the previous cycle (if that was possible).
*
* The test does not expect to survive out-of-memory errors, so needs sufficient heap memory
* for non-direct buffers and the debug tracking objects that check the allocate buffers.
* (The timing is very interesting when Xmx is lowered to increase garbage collection pauses, but do
* not set it too low).
*/
public class LongBufferPoolTest
{
private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class);
private static final int AVG_BUFFER_SIZE = 16 << 10;
private static final int STDEV_BUFFER_SIZE = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
static final class Debug implements BufferPool.Debug
{
static class DebugChunk
{
volatile long lastAcquired = 0; // the number of last round when the chunk was acquired from the global pool
volatile long lastRecycled = 0; // the number of last round when the chunk was recycled back to the global pool
static DebugChunk get(BufferPool.Chunk chunk)
{
if (chunk.debugAttachment == null)
chunk.debugAttachment = new DebugChunk();
return (DebugChunk) chunk.debugAttachment;
}
}
AtomicLong recycleRound = new AtomicLong(1);
final List<BufferPool.Chunk> normalChunks = new ArrayList<>();
public synchronized void registerNormal(BufferPool.Chunk chunk)
{
chunk.debugAttachment = new DebugChunk();
normalChunks.add(chunk);
}
@Override
public void acquire(BufferPool.Chunk chunk)
{
DebugChunk.get(chunk).lastAcquired = recycleRound.get();
}
@Override
public void recycleNormal(BufferPool.Chunk oldVersion, BufferPool.Chunk newVersion)
{
newVersion.debugAttachment = oldVersion.debugAttachment;
DebugChunk.get(oldVersion).lastRecycled = recycleRound.get();
}
@Override
public void recyclePartial(BufferPool.Chunk chunk)
{
DebugChunk.get(chunk).lastRecycled = recycleRound.get();
}
public synchronized void check()
{
long lastRecycledMax = 0;
long currentRound = recycleRound.get();
for (BufferPool.Chunk chunk : normalChunks)
{
DebugChunk dc = DebugChunk.get(chunk);
assert dc.lastRecycled >= dc.lastAcquired: "Last recycled " + dc.lastRecycled + " < last acquired " + dc.lastAcquired;
lastRecycledMax = Math.max(lastRecycledMax, dc.lastRecycled);
}
assert lastRecycledMax == currentRound : "No chunk recycled in round " + currentRound + ". " +
"Last chunk recycled in round " + lastRecycledMax + '.';
recycleRound.incrementAndGet();
}
}
@BeforeClass
public static void setup() throws Exception
{
DatabaseDescriptor.daemonInitialization();
}
@AfterClass
public static void teardown()
{
BufferPools.forChunkCache().unsafeReset();
BufferPools.forNetworking().unsafeReset();
}
@Test
public void testPoolAllocateWithRecyclePartially() throws InterruptedException, ExecutionException, BrokenBarrierException, TimeoutException
{
testPoolAllocate(true);
}
@Test
public void testPoolAllocateWithoutRecyclePartially() throws InterruptedException, ExecutionException, BrokenBarrierException, TimeoutException
{
testPoolAllocate(false);
}
private void testPoolAllocate(boolean recyclePartially) throws InterruptedException, ExecutionException, BrokenBarrierException, TimeoutException
{
BufferPool pool = new BufferPool("test_pool", 16 << 20, recyclePartially);
testAllocate(pool, Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L));
}
private static final class BufferCheck
{
final ByteBuffer buffer;
final long val;
DynamicList.Node<BufferCheck> listnode;
private BufferCheck(ByteBuffer buffer, long val)
{
this.buffer = buffer;
this.val = val;
}
void validate()
{
ByteBuffer read = buffer.duplicate();
while (read.remaining() > 8)
assert read.getLong() == val;
}
void init()
{
ByteBuffer write = buffer.duplicate();
while (write.remaining() > 8)
write.putLong(val);
}
}
private static final class TestEnvironment
{
final int threadCount;
final long duration;
final int poolSize;
final long until;
final CountDownLatch latch;
final SPSCQueue<BufferCheck>[] sharedRecycle;
volatile boolean shouldFreeMemoryAndSuspend = false;
final CyclicBarrier stopAllocationsBarrier;
final CyclicBarrier freedAllMemoryBarrier;
final CyclicBarrier resumeAllocationsBarrier;
final ExecutorService executorService;
final List<Future<Boolean>> threadResultFuture;
final int targetSizeQuanta;
TestEnvironment(int threadCount, long duration, long poolSize)
{
this.threadCount = threadCount;
this.duration = duration;
this.poolSize = Math.toIntExact(poolSize);
until = System.nanoTime() + duration;
latch = new CountDownLatch(threadCount);
sharedRecycle = new SPSCQueue[threadCount];
// N worker threads + burner thread + main thread:
stopAllocationsBarrier = new CyclicBarrier(threadCount + 2);
freedAllMemoryBarrier = new CyclicBarrier(threadCount + 2);
resumeAllocationsBarrier = new CyclicBarrier(threadCount + 2);
executorService = Executors.newFixedThreadPool(threadCount + 2, new NamedThreadFactory("test"));
threadResultFuture = new ArrayList<>(threadCount);
for (int i = 0; i < sharedRecycle.length; i++)
{
sharedRecycle[i] = new SPSCQueue<>();
}
// Divide the poolSize across our threads, deliberately over-subscribing it. Threads
// allocate a different amount of memory each - 1*quanta, 2*quanta, ... N*quanta.
// Thread0 is always going to be a single CHUNK, then to allocate increasing amounts
// using their own algorithm the targetSize should be poolSize / targetSizeQuanta.
//
// This should divide double the poolSize across the working threads,
// plus NORMAL_CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair.
targetSizeQuanta = 2 * this.poolSize / sum1toN(threadCount - 1);
}
void addCheckedFuture(Future<Boolean> future)
{
threadResultFuture.add(future);
}
int countDoneThreads()
{
int doneThreads = 0;
for (Future<Boolean> r : threadResultFuture)
{
if (r.isDone())
doneThreads++;
}
return doneThreads;
}
void assertCheckedThreadsSucceeded()
{
try
{
for (Future<Boolean> r : threadResultFuture)
assertTrue(r.get());
}
catch (InterruptedException ex)
{
// If interrupted while checking, restart and check everything.
assertCheckedThreadsSucceeded();
}
catch (ExecutionException ex)
{
fail("Checked thread threw exception: " + ex.toString());
}
}
/**
* Implementers must assure all buffers were returned to the buffer pool on run exit.
*/
interface MemoryFreeTask
{
void run();
}
/**
* If the main test loop requested stopping the threads by setting
* {@link TestEnvironment#shouldFreeMemoryAndSuspend},
* waits until all threads reach this call and then frees the memory by running the given memory free task.
* After the task finishes, it waits on the {@link TestEnvironment#freedAllMemoryBarrier} and
* {@link TestEnvironment#resumeAllocationsBarrier} to let the main test loop perform the post-free checks.
* The call exits after {@link TestEnvironment#resumeAllocationsBarrier} is reached by all threads.
*
* @param task the task that should return all buffers held by this thread to the buffer pool
*/
void maybeSuspendAndFreeMemory(MemoryFreeTask task) throws InterruptedException, BrokenBarrierException
{
if (shouldFreeMemoryAndSuspend)
{
try
{
// Wait until allocations stop in all threads; this guanrantees this thread won't
// receive any new buffers from other threads while freeing memory.
stopAllocationsBarrier.await();
// Free our memory
task.run();
// Now wait for the other threads to free their memory
freedAllMemoryBarrier.await();
// Now all memory is freed, but let's not resume allocations until the main test thread
// performs the required checks.
// At this point, used memory indicated by the pool
// should be == 0 and all buffers should be recycled.
resumeAllocationsBarrier.await();
}
catch (BrokenBarrierException | InterruptedException e)
{
// At the end of the test some threads may have already exited,
// so they can't arrive at one of the barriers, and we may end up here.
// This is fine if this happens after the test deadline, and we
// just allow the test worker to exit cleanly.
// It must not happen before the test deadline though, it would likely be a bug,
// so we rethrow in that case.
if (System.nanoTime() < until)
throw e;
}
}
}
}
public void testAllocate(BufferPool bufferPool, int threadCount, long duration) throws InterruptedException, ExecutionException, BrokenBarrierException, TimeoutException
{
logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration));
logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold());
Debug debug = new Debug();
bufferPool.debug(debug);
TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold());
startBurnerThreads(bufferPool, testEnv);
for (int threadIdx = 0; threadIdx < threadCount; threadIdx++)
testEnv.addCheckedFuture(startWorkerThread(bufferPool, testEnv, threadIdx));
while (!testEnv.latch.await(1L, TimeUnit.SECONDS))
{
try
{
// request all threads to release all buffers to the bufferPool
testEnv.shouldFreeMemoryAndSuspend = true;
testEnv.stopAllocationsBarrier.await(10, TimeUnit.SECONDS);
// wait until all memory released
testEnv.freedAllMemoryBarrier.await(10, TimeUnit.SECONDS);
// now all buffers should be back in the pool, and no more allocations happening
assert bufferPool.usedSizeInBytes() == 0 : "Some buffers haven't been freed. Memory in use = "
+ bufferPool.usedSizeInBytes() + " (expected 0)";
debug.check();
// resume threads only after debug.cycleRound has been increased
testEnv.shouldFreeMemoryAndSuspend = false;
testEnv.resumeAllocationsBarrier.await(10, TimeUnit.SECONDS);
}
catch (TimeoutException e)
{
// a thread that is done will not reach the barriers, so timeout is unexpected only if
// all threads are still running
if (testEnv.countDoneThreads() == 0)
{
logger.error("Some threads have stalled and didn't reach the barrier", e);
return;
}
}
}
for (SPSCQueue<BufferCheck> queue : testEnv.sharedRecycle)
{
BufferCheck check;
while ( null != (check = queue.poll()) )
{
check.validate();
bufferPool.put(check.buffer);
}
}
assertEquals(0, testEnv.executorService.shutdownNow().size());
logger.info("Reverting BufferPool DEBUG config");
bufferPool.debug(BufferPool.Debug.NO_OP);
testEnv.assertCheckedThreadsSucceeded();
logger.info("{} - finished.", DATE_FORMAT.format(new Date()));
}
private Future<Boolean> startWorkerThread(BufferPool bufferPool, TestEnvironment testEnv, final int threadIdx)
{
return testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
final int targetSize = threadIdx == 0 ? BufferPool.NORMAL_CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx;
final SPSCQueue<BufferCheck> shareFrom = testEnv.sharedRecycle[threadIdx];
final DynamicList<BufferCheck> checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10)));
final SPSCQueue<BufferCheck> shareTo = testEnv.sharedRecycle[(threadIdx + 1) % testEnv.threadCount];
final Future<Boolean> neighbourResultFuture = testEnv.threadResultFuture.get((threadIdx + 1) % testEnv.threadCount);
final ThreadLocalRandom rand = ThreadLocalRandom.current();
int totalSize = 0;
int freeingSize = 0;
int size = 0;
void testOne() throws Exception
{
testEnv.maybeSuspendAndFreeMemory(this::freeAll);
long currentTargetSize = rand.nextInt(testEnv.poolSize / 1024) == 0 ? 0 : targetSize;
int spinCount = 0;
while (totalSize > currentTargetSize - freeingSize)
{
// Don't get stuck in this loop if other threads might be suspended:
if (testEnv.shouldFreeMemoryAndSuspend)
return;
// Don't get stuck in this loop if the neighbour thread exited:
if (neighbourResultFuture.isDone())
return;
// free buffers until we're below our target size
if (checks.size() == 0)
{
// if we're out of buffers to free, we're waiting on our neighbour to free them;
// first check if the consuming neighbour has caught up, and if so mark that free
if (shareTo.exhausted)
{
totalSize -= freeingSize;
freeingSize = 0;
}
else if (!recycleFromNeighbour())
{
if (++spinCount > 1000 && System.nanoTime() > until)
return;
// otherwise, free one of our other neighbour's buffers if can; and otherwise yield
Thread.yield();
}
continue;
}
// pick a random buffer, with preference going to earlier ones
BufferCheck check = sample();
checks.remove(check.listnode);
check.validate();
size = BufferPool.roundUp(check.buffer.capacity());
if (size > BufferPool.NORMAL_CHUNK_SIZE)
size = 0;
// either share to free, or free immediately
if (rand.nextBoolean())
{
shareTo.add(check);
freeingSize += size;
// interleave this with potentially messing with the other neighbour's stuff
recycleFromNeighbour();
}
else
{
check.validate();
bufferPool.put(check.buffer);
totalSize -= size;
}
}
// allocate a new buffer
size = (int) Math.max(1, AVG_BUFFER_SIZE + (STDEV_BUFFER_SIZE * rand.nextGaussian()));
if (size <= BufferPool.NORMAL_CHUNK_SIZE)
{
totalSize += BufferPool.roundUp(size);
allocate(size);
}
else if (rand.nextBoolean())
{
allocate(size);
}
else
{
// perform a burst allocation to exhaust all available memory
while (totalSize < testEnv.poolSize)
{
size = (int) Math.max(1, AVG_BUFFER_SIZE + (STDEV_BUFFER_SIZE * rand.nextGaussian()));
if (size <= BufferPool.NORMAL_CHUNK_SIZE)
{
allocate(size);
totalSize += BufferPool.roundUp(size);
}
}
}
// validate a random buffer we have stashed
checks.get(rand.nextInt(checks.size())).validate();
// free all of our neighbour's remaining shared buffers
while (recycleFromNeighbour());
}
/**
* Returns all allocated buffers back to the buffer pool.
*/
void freeAll()
{
while (checks.size() > 0)
{
BufferCheck check = sample();
checks.remove(check.listnode);
check.validate();
bufferPool.put(check.buffer);
}
BufferCheck check;
while ((check = shareFrom.poll()) != null)
{
check.validate();
bufferPool.put(check.buffer);
}
bufferPool.releaseLocal();
}
void cleanup()
{
while (checks.size() > 0)
{
BufferCheck check = checks.get(0);
bufferPool.put(check.buffer);
checks.remove(check.listnode);
}
testEnv.latch.countDown();
}
boolean recycleFromNeighbour()
{
BufferCheck check = shareFrom.poll();
if (check == null)
return false;
check.validate();
bufferPool.put(check.buffer);
return true;
}
BufferCheck allocate(int size)
{
ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
BufferCheck check = new BufferCheck(buffer, rand.nextLong());
assertEquals(size, buffer.capacity());
assertEquals(0, buffer.position());
check.init();
check.listnode = checks.append(check);
return check;
}
BufferCheck sample()
{
// sample with preference to first elements:
// element at index n will be selected with likelihood (size - n) / sum1ToN(size)
int size = checks.size();
// pick a random number between 1 and sum1toN(size)
int sampleRange = sum1toN(size);
int sampleIndex = rand.nextInt(sampleRange);
// then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number
int moveBy = Math.max(size / 4, 1);
int index = size / 2;
while (true)
{
int baseSampleIndex = sum1toN(index);
int endOfSampleIndex = sum1toN(index + 1);
if (sampleIndex >= baseSampleIndex)
{
if (sampleIndex < endOfSampleIndex)
break;
index += moveBy;
}
else index -= moveBy;
moveBy = Math.max(moveBy / 2, 1);
}
// this gives us the inverse of our desired value, so just subtract it from the last index
index = size - (index + 1);
return checks.get(index);
}
});
}
private void startBurnerThreads(BufferPool bufferPool, TestEnvironment testEnv)
{
// setup some high churn allocate/deallocate, without any checking
final AtomicLong pendingBuffersCount = new AtomicLong(0);
final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
final CountDownLatch doneAdd = new CountDownLatch(1);
testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
int count = 0;
final ThreadLocalRandom rand = ThreadLocalRandom.current();
void testOne() throws Exception
{
if (count * BufferPool.NORMAL_CHUNK_SIZE >= testEnv.poolSize / 10)
{
if (pendingBuffersCount.get() == 0)
{
count = 0;
testEnv.maybeSuspendAndFreeMemory(bufferPool::releaseLocal);
} else
{
Thread.yield();
}
return;
}
ByteBuffer buffer = rand.nextInt(4) < 1
? bufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE)
: bufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT);
if (buffer == null)
{
Thread.yield();
return;
}
// 50/50 chance of returning the buffer from the producer thread, or
// pass it on to the consumer.
if (rand.nextBoolean())
bufferPool.put(buffer);
else
{
pendingBuffersCount.incrementAndGet();
burn.add(buffer);
}
count++;
}
void cleanup()
{
doneAdd.countDown();
}
}));
testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
void testOne() throws Exception
{
ByteBuffer buffer = burn.poll();
if (buffer == null)
{
Thread.yield();
return;
}
bufferPool.put(buffer);
pendingBuffersCount.decrementAndGet();
}
void cleanup()
{
Uninterruptibles.awaitUninterruptibly(doneAdd);
}
}));
}
static abstract class TestUntil implements Callable<Boolean>
{
final BufferPool bufferPool;
final long until;
protected TestUntil(BufferPool bufferPool, long until)
{
this.bufferPool = bufferPool;
this.until = until;
}
abstract void testOne() throws Exception;
void checkpoint() {}
void cleanup() {}
public Boolean call() throws Exception
{
try
{
while (System.nanoTime() < until)
{
checkpoint();
for (int i = 0 ; i < 100 ; i++)
testOne();
}
}
catch (Exception ex)
{
logger.error("Got exception {}, current chunk {}",
ex.getMessage(),
bufferPool.unsafeCurrentChunk());
ex.printStackTrace();
return false;
}
catch (Throwable tr) // for java.lang.OutOfMemoryError
{
logger.error("Got throwable {}, current chunk {}",
tr.getMessage(),
bufferPool.unsafeCurrentChunk());
tr.printStackTrace();
return false;
}
finally
{
cleanup();
}
return true;
}
}
public static void main(String[] args)
{
try
{
LongBufferPoolTest.setup();
new LongBufferPoolTest().testAllocate(new BufferPool("test_pool", 16 << 20, true),
Runtime.getRuntime().availableProcessors(),
TimeUnit.HOURS.toNanos(2L));
System.exit(0);
}
catch (Throwable tr)
{
logger.error("Test failed - {}", tr.getMessage(), tr);
System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure
}
}
/**
* A single producer, single consumer queue.
*/
private static final class SPSCQueue<V>
{
static final class Node<V>
{
volatile Node<V> next;
final V value;
Node(V value)
{
this.value = value;
}
}
private volatile boolean exhausted = true;
Node<V> head = new Node<>(null);
Node<V> tail = head;
void add(V value)
{
exhausted = false;
tail = tail.next = new Node<>(value);
}
V poll()
{
Node<V> next = head.next;
if (next == null)
{
// this is racey, but good enough for our purposes
exhausted = true;
return null;
}
head = next;
return next.value;
}
}
private static int sum1toN(int n)
{
return (n * (n + 1)) / 2;
}
}