| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.utils.memory; |
| |
| import java.lang.ref.PhantomReference; |
| import java.lang.ref.ReferenceQueue; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
| |
| import org.apache.cassandra.io.compress.BufferType; |
| import org.apache.cassandra.io.util.FileUtils; |
| import org.apache.cassandra.utils.ExecutorUtils; |
| import org.apache.cassandra.utils.NoSpamLogger; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.cassandra.concurrent.InfiniteLoopExecutor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.metrics.BufferPoolMetrics; |
| import org.apache.cassandra.utils.concurrent.Ref; |
| |
| import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; |
| import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow; |
| |
| /** |
| * A pool of ByteBuffers that can be recycled. |
| */ |
| public class BufferPool |
| { |
| /** The size of a page aligned buffer, 64KiB */ |
| static final int CHUNK_SIZE = 64 << 10; |
| |
| @VisibleForTesting |
| public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; |
| |
| @VisibleForTesting |
| public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = DatabaseDescriptor.getBufferPoolUseHeapIfExhausted(); |
| |
| @VisibleForTesting |
| public static boolean DISABLED = Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", "false")); |
| |
| @VisibleForTesting |
| public static boolean DEBUG = false; |
| |
| private static final Logger logger = LoggerFactory.getLogger(BufferPool.class); |
| private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES); |
| private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); |
| |
| /** A global pool of chunks (page aligned buffers) */ |
| private static final GlobalPool globalPool = new GlobalPool(); |
| |
| /** A thread local pool of chunks, where chunks come from the global pool */ |
| private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() { |
| @Override |
| protected LocalPool initialValue() |
| { |
| return new LocalPool(); |
| } |
| }; |
| |
| public static ByteBuffer get(int size) |
| { |
| if (DISABLED) |
| return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); |
| else |
| return takeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); |
| } |
| |
| public static ByteBuffer get(int size, BufferType bufferType) |
| { |
| boolean direct = bufferType == BufferType.OFF_HEAP; |
| if (DISABLED || !direct) |
| return allocate(size, !direct); |
| else |
| return takeFromPool(size, !direct); |
| } |
| |
| /** Unlike the get methods, this will return null if the pool is exhausted */ |
| public static ByteBuffer tryGet(int size) |
| { |
| if (DISABLED) |
| return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); |
| else |
| return maybeTakeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); |
| } |
| |
| private static ByteBuffer allocate(int size, boolean onHeap) |
| { |
| return onHeap |
| ? ByteBuffer.allocate(size) |
| : ByteBuffer.allocateDirect(size); |
| } |
| |
| private static ByteBuffer takeFromPool(int size, boolean allocateOnHeapWhenExhausted) |
| { |
| ByteBuffer ret = maybeTakeFromPool(size, allocateOnHeapWhenExhausted); |
| if (ret != null) |
| return ret; |
| |
| if (logger.isTraceEnabled()) |
| logger.trace("Requested buffer size {} has been allocated directly due to lack of capacity", size); |
| |
| return localPool.get().allocate(size, allocateOnHeapWhenExhausted); |
| } |
| |
| private static ByteBuffer maybeTakeFromPool(int size, boolean allocateOnHeapWhenExhausted) |
| { |
| if (size < 0) |
| throw new IllegalArgumentException("Size must be positive (" + size + ")"); |
| |
| if (size == 0) |
| return EMPTY_BUFFER; |
| |
| if (size > CHUNK_SIZE) |
| { |
| if (logger.isTraceEnabled()) |
| logger.trace("Requested buffer size {} is bigger than {}, allocating directly", size, CHUNK_SIZE); |
| |
| return localPool.get().allocate(size, allocateOnHeapWhenExhausted); |
| } |
| |
| return localPool.get().get(size); |
| } |
| |
| public static void put(ByteBuffer buffer) |
| { |
| if (!(DISABLED || buffer.hasArray())) |
| localPool.get().put(buffer); |
| } |
| |
| /** This is not thread safe and should only be used for unit testing. */ |
| @VisibleForTesting |
| static void reset() |
| { |
| localPool.get().reset(); |
| globalPool.reset(); |
| } |
| |
| @VisibleForTesting |
| static Chunk currentChunk() |
| { |
| return localPool.get().chunks[0]; |
| } |
| |
| @VisibleForTesting |
| static int numChunks() |
| { |
| int ret = 0; |
| for (Chunk chunk : localPool.get().chunks) |
| { |
| if (chunk != null) |
| ret++; |
| } |
| return ret; |
| } |
| |
| @VisibleForTesting |
| static void assertAllRecycled() |
| { |
| globalPool.debug.check(); |
| } |
| |
| public static long sizeInBytes() |
| { |
| return globalPool.sizeInBytes(); |
| } |
| |
| static final class Debug |
| { |
| long recycleRound = 1; |
| final Queue<Chunk> allChunks = new ConcurrentLinkedQueue<>(); |
| void register(Chunk chunk) |
| { |
| allChunks.add(chunk); |
| } |
| void recycle(Chunk chunk) |
| { |
| chunk.lastRecycled = recycleRound; |
| } |
| void check() |
| { |
| for (Chunk chunk : allChunks) |
| assert chunk.lastRecycled == recycleRound; |
| recycleRound++; |
| } |
| } |
| |
| /** |
| * A queue of page aligned buffers, the chunks, which have been sliced from bigger chunks, |
| * the macro-chunks, also page aligned. Macro-chunks are allocated as long as we have not exceeded the |
| * memory maximum threshold, MEMORY_USAGE_THRESHOLD and are never released. |
| * |
| * This class is shared by multiple thread local pools and must be thread-safe. |
| */ |
| static final class GlobalPool |
| { |
| /** The size of a bigger chunk, 1-mbit, must be a multiple of CHUNK_SIZE */ |
| static final int MACRO_CHUNK_SIZE = 1 << 20; |
| |
| static |
| { |
| assert Integer.bitCount(CHUNK_SIZE) == 1; // must be a power of 2 |
| assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 |
| assert MACRO_CHUNK_SIZE % CHUNK_SIZE == 0; // must be a multiple |
| |
| if (DISABLED) |
| logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); |
| else |
| logger.info("Global buffer pool is enabled, when pool is exahusted (max is {} mb) it will allocate {}", |
| MEMORY_USAGE_THRESHOLD / (1024L * 1024L), |
| ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); |
| } |
| |
| private final Debug debug = new Debug(); |
| private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>(); |
| // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage |
| private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>(); |
| private final AtomicLong memoryUsage = new AtomicLong(); |
| |
| /** Return a chunk, the caller will take owership of the parent chunk. */ |
| public Chunk get() |
| { |
| while (true) |
| { |
| Chunk chunk = chunks.poll(); |
| if (chunk != null) |
| return chunk; |
| |
| if (!allocateMoreChunks()) |
| // give it one last attempt, in case someone else allocated before us |
| return chunks.poll(); |
| } |
| } |
| |
| /** |
| * This method might be called by multiple threads and that's fine if we add more |
| * than one chunk at the same time as long as we don't exceed the MEMORY_USAGE_THRESHOLD. |
| */ |
| private boolean allocateMoreChunks() |
| { |
| while (true) |
| { |
| long cur = memoryUsage.get(); |
| if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD) |
| { |
| noSpamLogger.info("Maximum memory usage reached ({} bytes), cannot allocate chunk of {} bytes", |
| MEMORY_USAGE_THRESHOLD, MACRO_CHUNK_SIZE); |
| return false; |
| } |
| if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE)) |
| break; |
| } |
| |
| // allocate a large chunk |
| Chunk chunk = new Chunk(allocateDirectAligned(MACRO_CHUNK_SIZE)); |
| chunk.acquire(null); |
| macroChunks.add(chunk); |
| for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE) |
| { |
| Chunk add = new Chunk(chunk.get(CHUNK_SIZE)); |
| chunks.add(add); |
| if (DEBUG) |
| debug.register(add); |
| } |
| |
| return true; |
| } |
| |
| public void recycle(Chunk chunk) |
| { |
| chunks.add(chunk); |
| } |
| |
| public long sizeInBytes() |
| { |
| return memoryUsage.get(); |
| } |
| |
| /** This is not thread safe and should only be used for unit testing. */ |
| @VisibleForTesting |
| void reset() |
| { |
| while (!chunks.isEmpty()) |
| chunks.poll().reset(); |
| |
| while (!macroChunks.isEmpty()) |
| macroChunks.poll().reset(); |
| |
| memoryUsage.set(0); |
| } |
| } |
| |
| /** |
| * A thread local class that grabs chunks from the global pool for this thread allocations. |
| * Only one thread can do the allocations but multiple threads can release the allocations. |
| */ |
| static final class LocalPool |
| { |
| private final static BufferPoolMetrics metrics = new BufferPoolMetrics(); |
| // a microqueue of Chunks: |
| // * if any are null, they are at the end; |
| // * new Chunks are added to the last null index |
| // * if no null indexes available, the smallest is swapped with the last index, and this replaced |
| // * this results in a queue that will typically be visited in ascending order of available space, so that |
| // small allocations preferentially slice from the Chunks with the smallest space available to furnish them |
| // WARNING: if we ever change the size of this, we must update removeFromLocalQueue, and addChunk |
| private final Chunk[] chunks = new Chunk[3]; |
| private byte chunkCount = 0; |
| |
| public LocalPool() |
| { |
| localPoolReferences.add(new LocalPoolRef(this, localPoolRefQueue)); |
| } |
| |
| private Chunk addChunkFromGlobalPool() |
| { |
| Chunk chunk = globalPool.get(); |
| if (chunk == null) |
| return null; |
| |
| addChunk(chunk); |
| return chunk; |
| } |
| |
| private void addChunk(Chunk chunk) |
| { |
| chunk.acquire(this); |
| |
| if (chunkCount < 3) |
| { |
| chunks[chunkCount++] = chunk; |
| return; |
| } |
| |
| int smallestChunkIdx = 0; |
| if (chunks[1].free() < chunks[0].free()) |
| smallestChunkIdx = 1; |
| if (chunks[2].free() < chunks[smallestChunkIdx].free()) |
| smallestChunkIdx = 2; |
| |
| chunks[smallestChunkIdx].release(); |
| if (smallestChunkIdx != 2) |
| chunks[smallestChunkIdx] = chunks[2]; |
| chunks[2] = chunk; |
| } |
| |
| public ByteBuffer get(int size) |
| { |
| for (Chunk chunk : chunks) |
| { // first see if our own chunks can serve this buffer |
| if (chunk == null) |
| break; |
| |
| ByteBuffer buffer = chunk.get(size); |
| if (buffer != null) |
| return buffer; |
| } |
| |
| // else ask the global pool |
| Chunk chunk = addChunkFromGlobalPool(); |
| if (chunk != null) |
| return chunk.get(size); |
| |
| return null; |
| } |
| |
| private ByteBuffer allocate(int size, boolean onHeap) |
| { |
| metrics.misses.mark(); |
| return BufferPool.allocate(size, onHeap); |
| } |
| |
| public void put(ByteBuffer buffer) |
| { |
| Chunk chunk = Chunk.getParentChunk(buffer); |
| if (chunk == null) |
| { |
| FileUtils.clean(buffer); |
| return; |
| } |
| |
| LocalPool owner = chunk.owner; |
| // ask the free method to take exclusive ownership of the act of recycling |
| // if we are either: already not owned by anyone, or owned by ourselves |
| long free = chunk.free(buffer, owner == null | owner == this); |
| if (free == 0L) |
| { |
| // 0L => we own recycling responsibility, so must recycle; |
| chunk.recycle(); |
| // if we are also the owner, we must remove the Chunk from our local queue |
| if (owner == this) |
| removeFromLocalQueue(chunk); |
| } |
| else if (((free == -1L) && owner != this) && chunk.owner == null) |
| { |
| // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset. |
| // we must also check after completely freeing if the owner has since been unset, and try to recycle |
| chunk.tryRecycle(); |
| } |
| } |
| |
| private void removeFromLocalQueue(Chunk chunk) |
| { |
| // since we only have three elements in the queue, it is clearer, easier and faster to just hard code the options |
| if (chunks[0] == chunk) |
| { // remove first by shifting back second two |
| chunks[0] = chunks[1]; |
| chunks[1] = chunks[2]; |
| } |
| else if (chunks[1] == chunk) |
| { // remove second by shifting back last |
| chunks[1] = chunks[2]; |
| } |
| else assert chunks[2] == chunk; |
| // whatever we do, the last element myst be null |
| chunks[2] = null; |
| chunkCount--; |
| } |
| |
| @VisibleForTesting |
| void reset() |
| { |
| chunkCount = 0; |
| for (int i = 0; i < chunks.length; i++) |
| { |
| if (chunks[i] != null) |
| { |
| chunks[i].owner = null; |
| chunks[i].freeSlots = 0L; |
| chunks[i].recycle(); |
| chunks[i] = null; |
| } |
| } |
| } |
| } |
| |
| private static final class LocalPoolRef extends PhantomReference<LocalPool> |
| { |
| private final Chunk[] chunks; |
| public LocalPoolRef(LocalPool localPool, ReferenceQueue<? super LocalPool> q) |
| { |
| super(localPool, q); |
| chunks = localPool.chunks; |
| } |
| |
| public void release() |
| { |
| for (int i = 0 ; i < chunks.length ; i++) |
| { |
| if (chunks[i] != null) |
| { |
| chunks[i].release(); |
| chunks[i] = null; |
| } |
| } |
| } |
| } |
| |
| private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>(); |
| |
| private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>(); |
| private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start(); |
| |
| private static void cleanupOneReference() throws InterruptedException |
| { |
| Object obj = localPoolRefQueue.remove(100); |
| if (obj instanceof LocalPoolRef) |
| { |
| ((LocalPoolRef) obj).release(); |
| localPoolReferences.remove(obj); |
| } |
| } |
| |
| private static ByteBuffer allocateDirectAligned(int capacity) |
| { |
| int align = MemoryUtil.pageSize(); |
| if (Integer.bitCount(align) != 1) |
| throw new IllegalArgumentException("Alignment must be a power of 2"); |
| |
| ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + align); |
| long address = MemoryUtil.getAddress(buffer); |
| long offset = address & (align -1); // (address % align) |
| |
| if (offset == 0) |
| { // already aligned |
| buffer.limit(capacity); |
| } |
| else |
| { // shift by offset |
| int pos = (int)(align - offset); |
| buffer.position(pos); |
| buffer.limit(pos + capacity); |
| } |
| |
| return buffer.slice(); |
| } |
| |
| /** |
| * A memory chunk: it takes a buffer (the slab) and slices it |
| * into smaller buffers when requested. |
| * |
| * It divides the slab into 64 units and keeps a long mask, freeSlots, |
| * indicating if a unit is in use or not. Each bit in freeSlots corresponds |
| * to a unit, if the bit is set then the unit is free (available for allocation) |
| * whilst if it is not set then the unit is in use. |
| * |
| * When we receive a request of a given size we round up the size to the nearest |
| * multiple of allocation units required. Then we search for n consecutive free units, |
| * where n is the number of units required. We also align to page boundaries. |
| * |
| * When we reiceve a release request we work out the position by comparing the buffer |
| * address to our base address and we simply release the units. |
| */ |
| final static class Chunk |
| { |
| private final ByteBuffer slab; |
| private final long baseAddress; |
| private final int shift; |
| |
| private volatile long freeSlots; |
| private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots"); |
| |
| // the pool that is _currently allocating_ from this Chunk |
| // if this is set, it means the chunk may not be recycled because we may still allocate from it; |
| // if it has been unset the local pool has finished with it, and it may be recycled |
| private volatile LocalPool owner; |
| private long lastRecycled; |
| private final Chunk original; |
| |
| Chunk(Chunk recycle) |
| { |
| assert recycle.freeSlots == 0L; |
| this.slab = recycle.slab; |
| this.baseAddress = recycle.baseAddress; |
| this.shift = recycle.shift; |
| this.freeSlots = -1L; |
| this.original = recycle.original; |
| if (DEBUG) |
| globalPool.debug.recycle(original); |
| } |
| |
| Chunk(ByteBuffer slab) |
| { |
| assert !slab.hasArray(); |
| this.slab = slab; |
| this.baseAddress = MemoryUtil.getAddress(slab); |
| |
| // The number of bits by which we need to shift to obtain a unit |
| // "31 &" is because numberOfTrailingZeros returns 32 when the capacity is zero |
| this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64)); |
| // -1 means all free whilst 0 means all in use |
| this.freeSlots = slab.capacity() == 0 ? 0L : -1L; |
| this.original = DEBUG ? this : null; |
| } |
| |
| /** |
| * Acquire the chunk for future allocations: set the owner and prep |
| * the free slots mask. |
| */ |
| void acquire(LocalPool owner) |
| { |
| assert this.owner == null; |
| this.owner = owner; |
| } |
| |
| /** |
| * Set the owner to null and return the chunk to the global pool if the chunk is fully free. |
| * This method must be called by the LocalPool when it is certain that |
| * the local pool shall never try to allocate any more buffers from this chunk. |
| */ |
| void release() |
| { |
| this.owner = null; |
| tryRecycle(); |
| } |
| |
| void tryRecycle() |
| { |
| assert owner == null; |
| if (isFree() && freeSlotsUpdater.compareAndSet(this, -1L, 0L)) |
| recycle(); |
| } |
| |
| void recycle() |
| { |
| assert freeSlots == 0L; |
| globalPool.recycle(new Chunk(this)); |
| } |
| |
| /** |
| * We stash the chunk in the attachment of a buffer |
| * that was returned by get(), this method simply |
| * retrives the chunk that sliced a buffer, if any. |
| */ |
| static Chunk getParentChunk(ByteBuffer buffer) |
| { |
| Object attachment = MemoryUtil.getAttachment(buffer); |
| |
| if (attachment instanceof Chunk) |
| return (Chunk) attachment; |
| |
| if (attachment instanceof Ref) |
| return ((Ref<Chunk>) attachment).get(); |
| |
| return null; |
| } |
| |
| ByteBuffer setAttachment(ByteBuffer buffer) |
| { |
| if (Ref.DEBUG_ENABLED) |
| MemoryUtil.setAttachment(buffer, new Ref<>(this, null)); |
| else |
| MemoryUtil.setAttachment(buffer, this); |
| |
| return buffer; |
| } |
| |
| boolean releaseAttachment(ByteBuffer buffer) |
| { |
| Object attachment = MemoryUtil.getAttachment(buffer); |
| if (attachment == null) |
| return false; |
| |
| if (attachment instanceof Ref) |
| ((Ref<Chunk>) attachment).release(); |
| |
| return true; |
| } |
| |
| @VisibleForTesting |
| void reset() |
| { |
| Chunk parent = getParentChunk(slab); |
| if (parent != null) |
| parent.free(slab, false); |
| else |
| FileUtils.clean(slab); |
| } |
| |
| @VisibleForTesting |
| long setFreeSlots(long val) |
| { |
| long ret = freeSlots; |
| freeSlots = val; |
| return ret; |
| } |
| |
| int capacity() |
| { |
| return 64 << shift; |
| } |
| |
| final int unit() |
| { |
| return 1 << shift; |
| } |
| |
| final boolean isFree() |
| { |
| return freeSlots == -1L; |
| } |
| |
| /** The total free size */ |
| int free() |
| { |
| return Long.bitCount(freeSlots) * unit(); |
| } |
| |
| /** |
| * Return the next available slice of this size. If |
| * we have exceeded the capacity we return null. |
| */ |
| ByteBuffer get(int size) |
| { |
| // how many multiples of our units is the size? |
| // we add (unit - 1), so that when we divide by unit (>>> shift), we effectively round up |
| int slotCount = (size - 1 + unit()) >>> shift; |
| |
| // if we require more than 64 slots, we cannot possibly accommodate the allocation |
| if (slotCount > 64) |
| return null; |
| |
| // convert the slotCount into the bits needed in the bitmap, but at the bottom of the register |
| long slotBits = -1L >>> (64 - slotCount); |
| |
| // in order that we always allocate page aligned results, we require that any allocation is "somewhat" aligned |
| // i.e. any single unit allocation can go anywhere; any 2 unit allocation must begin in one of the first 3 slots |
| // of a page; a 3 unit must go in the first two slots; and any four unit allocation must be fully page-aligned |
| |
| // to achieve this, we construct a searchMask that constrains the bits we find to those we permit starting |
| // a match from. as we find bits, we remove them from the mask to continue our search. |
| // this has an odd property when it comes to concurrent alloc/free, as we can safely skip backwards if |
| // a new slot is freed up, but we always make forward progress (i.e. never check the same bits twice), |
| // so running time is bounded |
| long searchMask = 0x1111111111111111L; |
| searchMask *= 15L >>> ((slotCount - 1) & 3); |
| // i.e. switch (slotCount & 3) |
| // case 1: searchMask = 0xFFFFFFFFFFFFFFFFL |
| // case 2: searchMask = 0x7777777777777777L |
| // case 3: searchMask = 0x3333333333333333L |
| // case 0: searchMask = 0x1111111111111111L |
| |
| // truncate the mask, removing bits that have too few slots proceeding them |
| searchMask &= -1L >>> (slotCount - 1); |
| |
| // this loop is very unroll friendly, and would achieve high ILP, but not clear if the compiler will exploit this. |
| // right now, not worth manually exploiting, but worth noting for future |
| while (true) |
| { |
| long cur = freeSlots; |
| // find the index of the lowest set bit that also occurs in our mask (i.e. is permitted alignment, and not yet searched) |
| // we take the index, rather than finding the lowest bit, since we must obtain it anyway, and shifting is more efficient |
| // than multiplication |
| int index = Long.numberOfTrailingZeros(cur & searchMask); |
| |
| // if no bit was actually found, we cannot serve this request, so return null. |
| // due to truncating the searchMask this immediately terminates any search when we run out of indexes |
| // that could accommodate the allocation, i.e. is equivalent to checking (64 - index) < slotCount |
| if (index == 64) |
| return null; |
| |
| // remove this bit from our searchMask, so we don't return here next round |
| searchMask ^= 1L << index; |
| // if our bits occur starting at the index, remove ourselves from the bitmask and return |
| long candidate = slotBits << index; |
| if ((candidate & cur) == candidate) |
| { |
| // here we are sure we will manage to CAS successfully without changing candidate because |
| // there is only one thread allocating at the moment, the concurrency is with the release |
| // operations only |
| while (true) |
| { |
| // clear the candidate bits (freeSlots &= ~candidate) |
| if (freeSlotsUpdater.compareAndSet(this, cur, cur & ~candidate)) |
| break; |
| |
| cur = freeSlots; |
| // make sure no other thread has cleared the candidate bits |
| assert ((candidate & cur) == candidate); |
| } |
| return get(index << shift, size); |
| } |
| } |
| } |
| |
| private ByteBuffer get(int offset, int size) |
| { |
| slab.limit(offset + size); |
| slab.position(offset); |
| |
| return setAttachment(slab.slice()); |
| } |
| |
| /** |
| * Round the size to the next unit multiple. |
| */ |
| int roundUp(int v) |
| { |
| return BufferPool.roundUp(v, unit()); |
| } |
| |
| /** |
| * Release a buffer. Return: |
| * 0L if the buffer must be recycled after the call; |
| * -1L if it is free (and so we should tryRecycle if owner is now null) |
| * some other value otherwise |
| **/ |
| long free(ByteBuffer buffer, boolean tryRelease) |
| { |
| if (!releaseAttachment(buffer)) |
| return 1L; |
| |
| long address = MemoryUtil.getAddress(buffer); |
| assert (address >= baseAddress) & (address <= baseAddress + capacity()); |
| |
| int position = (int)(address - baseAddress); |
| int size = roundUp(buffer.capacity()); |
| |
| position >>= shift; |
| int slotCount = size >> shift; |
| |
| long slotBits = (1L << slotCount) - 1; |
| long shiftedSlotBits = (slotBits << position); |
| |
| if (slotCount == 64) |
| { |
| assert size == capacity(); |
| assert position == 0; |
| shiftedSlotBits = -1L; |
| } |
| |
| long next; |
| while (true) |
| { |
| long cur = freeSlots; |
| next = cur | shiftedSlotBits; |
| assert next == (cur ^ shiftedSlotBits); // ensure no double free |
| if (tryRelease && (next == -1L)) |
| next = 0L; |
| if (freeSlotsUpdater.compareAndSet(this, cur, next)) |
| return next; |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("[slab %s, slots bitmap %s, capacity %d, free %d]", slab, Long.toBinaryString(freeSlots), capacity(), free()); |
| } |
| } |
| |
| @VisibleForTesting |
| public static int roundUpNormal(int size) |
| { |
| return roundUp(size, CHUNK_SIZE / 64); |
| } |
| |
| private static int roundUp(int size, int unit) |
| { |
| int mask = unit - 1; |
| return (size + mask) & ~mask; |
| } |
| |
| @VisibleForTesting |
| public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException |
| { |
| shutdownNow(Arrays.asList(EXEC)); |
| awaitTermination(timeout, unit, Arrays.asList(EXEC)); |
| } |
| } |