blob: 7c4e288bbdd7bb3aacca8404c26d538fc204205f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import io.netty.buffer.*;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.internal.PlatformDependent;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
/* this class is PooledBuffer holder */
public class BufferPool {
public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache";
private static final ByteBufAllocator ALLOCATOR;
private BufferPool() {
}
static {
if (TajoConstants.IS_TEST_MODE) {
/* Disable pooling buffers for memory usage */
ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
/* if you are finding memory leak, please enable this line */
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
} else {
TajoConf tajoConf = new TajoConf();
ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, true), 0);
}
}
/**
* borrowed from Spark
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
boolean allowCache,
int numCores) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
);
}
/** Used to get defaults from Netty's private static fields. */
private static int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static long maxDirectMemory() {
return PlatformDependent.maxDirectMemory();
}
public static ByteBuf directBuffer(int size) {
return directBuffer(size, ByteOrder.LITTLE_ENDIAN);
}
/**
* @param size the initial capacity
* @param order the endianness
* @return allocated ByteBuf from pool
*/
public static ByteBuf directBuffer(int size, ByteOrder order) {
ByteBuf byteBuf = ALLOCATOR.directBuffer(size);
if (byteBuf.order() != order) byteBuf.order(order);
return byteBuf;
}
/**
* @param size the initial capacity
* @param max the max capacity
* @return allocated ByteBuf from pool
*/
public static ByteBuf directBuffer(int size, int max) {
return directBuffer(size, max, ByteOrder.LITTLE_ENDIAN);
}
/**
* @param size the initial capacity
* @param max the max capacity
* @param order the endianness
* @return allocated ByteBuf from pool
*/
public static ByteBuf directBuffer(int size, int max, ByteOrder order) {
ByteBuf byteBuf = ALLOCATOR.directBuffer(size, max);
if (byteBuf.order() != order) byteBuf.order(order);
return byteBuf;
}
/**
*
* @param size the initial capacity
* @param max the max capacity
* @return heap ByteBuf
*/
public static ByteBuf heapBuffer(int size, int max) {
return Unpooled.buffer(size, max).order(ByteOrder.LITTLE_ENDIAN);
}
@InterfaceStability.Unstable
public static void forceRelease(ByteBuf buf) {
buf.release(buf.refCnt());
}
/**
* the ByteBuf will increase to writable size
* @param buf
* @param minWritableBytes required minimum writable size
*/
public static ByteBuf ensureWritable(ByteBuf buf, int minWritableBytes) {
return buf.ensureWritable(minWritableBytes).order(ByteOrder.LITTLE_ENDIAN);
}
/**
* deallocate the specified direct memory
* @param byteBuffer
*/
public static void free(ByteBuffer byteBuffer) {
PlatformDependent.freeDirectBuffer(byteBuffer);
}
/**
* get the specified direct memory bean
*/
public static BufferPoolMXBean getDirectBufferPool() {
for (BufferPoolMXBean pool : getBufferPools()) {
if (pool.getName().equals("direct")) {
return pool;
}
}
return null;
}
/**
* get the specified mapped memory bean
*/
public static BufferPoolMXBean getMappedBufferPool() {
for (BufferPoolMXBean pool : getBufferPools()) {
if (pool.getName().equals("mapped")) {
return pool;
}
}
return null;
}
private static List<BufferPoolMXBean> getBufferPools() {
return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
}
}