blob: 25ff22af80f3684810bcfcd1bc42a341cf867193 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.conf.ServerConfiguration;
/**
* SkipList allocation buffer to reduce memory fragment.
* Adapted from HBase project.
* <p>
* The SkipListArena is basically a bump-the-pointer allocator that allocates
* big (default 2MB) byte[] chunks from and then handles it out to threads that
* request slices into the array.
* </p>
* <p>
* The purpose of this class is to combat heap fragmentation in the
* bookie. By ensuring that all KeyValues in a given SkipList refer
* only to large chunks of contiguous memory, we ensure that large blocks
* get freed up when the SkipList is flushed.
* </p>
* <p>
* Without the Arena, the byte array allocated during insertion end up
* interleaved throughout the heap, and the old generation gets progressively
* more fragmented until a stop-the-world compacting collection occurs.
* </p>
*/
public class SkipListArena {
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
final int chunkSize;
final int maxAlloc;
public SkipListArena(ServerConfiguration cfg) {
chunkSize = cfg.getSkipListArenaChunkSize();
maxAlloc = cfg.getSkipListArenaMaxAllocSize();
assert maxAlloc <= chunkSize;
}
/**
* Allocate a slice of the given length.
* <p>
* If the size is larger than the maximum size specified for this allocator, returns null.
* </p>
*/
public MemorySlice allocateBytes(int size) {
assert size >= 0;
// Callers should satisfy large allocations directly from JVM since they
// don't cause fragmentation as badly.
if (size > maxAlloc) {
return null;
}
while (true) {
Chunk c = getCurrentChunk();
// Try to allocate from this chunk
int allocOffset = c.alloc(size);
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
return new MemorySlice(c.data, allocOffset);
}
// not enough space!
// try to retire this chunk
retireCurrentChunk(c);
}
}
/**
* Try to retire the current chunk if it is still there.
*/
private void retireCurrentChunk(Chunk c) {
curChunk.compareAndSet(c, null);
// If the CAS fails, that means that someone else already
// retired the chunk for us.
}
/**
* Get the current chunk, or, if there is no current chunk,
* allocate a new one from the JVM.
*/
private Chunk getCurrentChunk() {
while (true) {
// Try to get the chunk
Chunk c = curChunk.get();
if (c != null) {
return c;
}
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
c = new Chunk(chunkSize);
if (curChunk.compareAndSet(null, c)) {
c.init();
return c;
}
// lost race
}
}
/**
* A chunk of memory out of which allocations are sliced.
*/
private static class Chunk {
/** Actual underlying data. */
private byte[] data;
private static final int UNINITIALIZED = -1;
private static final int OOM = -2;
/**
* Offset for the next allocation, or the sentinel value -1
* which implies that the chunk is still uninitialized.
*/
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
/** Total number of allocations satisfied from this buffer. */
private AtomicInteger allocCount = new AtomicInteger();
/** Size of chunk in bytes. */
private final int size;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
*/
private Chunk(int size) {
this.size = size;
}
/**
* Actually claim the memory for this chunk. This should only be called from
* the thread that constructed the chunk. It is thread-safe against other
* threads calling alloc(), who will block until the allocation is complete.
*/
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
data = new byte[size];
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
boolean okInit = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
assert okInit; // single-threaded call
}
/**
* Try to allocate <code>size</code> bytes from the chunk.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
while (true) {
int oldOffset = nextFreeOffset.get();
if (oldOffset == UNINITIALIZED) {
// Other thread allocating it right now
Thread.yield();
continue;
}
if (oldOffset == OOM) {
return -1;
}
if (oldOffset + size > data.length) {
return -1; // alloc doesn't fit
}
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc
allocCount.incrementAndGet();
return oldOffset;
}
// lost race
}
}
@Override
public String toString() {
return "Chunk@" + System.identityHashCode(this) + ": used(" + allocCount.get() + "), free("
+ (data.length - nextFreeOffset.get() + ")");
}
}
/**
* The result of a single allocation. Contains the chunk that the
* allocation points into, and the offset in this array where the
* slice begins.
*/
public static class MemorySlice {
private final byte[] data;
private final int offset;
private MemorySlice(byte[] data, int off) {
this.data = data;
this.offset = off;
}
@Override
public String toString() {
return "Slice:" + "capacity(" + data.length + "), offset(" + offset + ")";
}
byte[] getData() {
return data;
}
int getOffset() {
return offset;
}
}
}