blob: 368583c1a7a74a42243bee9a3931a64f314c6f56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mnemonic;
import org.apache.mnemonic.resgc.ContextWrapper;
import org.apache.mnemonic.resgc.ResCollector;
import org.apache.mnemonic.resgc.ResReclaim;
import org.apache.mnemonic.resgc.ReclaimContext;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.mnemonic.service.memory.MemoryServiceFeature.EXPANDABLE;
import static org.apache.mnemonic.service.memory.MemoryServiceFeature.SHRINKABLE;
/**
* manage a system memory pool as a internal volatile allocator
*
*/
@SuppressWarnings("restriction")
public class SysMemAllocator extends CommonAllocator<SysMemAllocator> {
private boolean m_activegc = true;
private long m_gctimeout = 100;
@SuppressWarnings({"restriction", "UseOfSunClasses"})
private static sun.misc.Unsafe m_unsafe = null;
private AtomicLong currentMemory = new AtomicLong(0L);
private long maxStoreCapacity = 0L;
private Map<Long, Long> m_chunksize = new ConcurrentHashMap<Long, Long>();
private static Method buf_clean_method;
/**
* Constructor, it initialize and allocate a memory pool from Java off-heap
* with specified capacity.
*
* @param capacity
* specify the capacity of a system memory pool
*
* @param isnew
* a place holder, always specify it as true
*
* @throws Exception
* fail to retrieve Unsafe object
*
*/
public SysMemAllocator(long capacity, boolean isnew) throws Exception {
m_unsafe = Utils.getUnsafe();
buf_clean_method = sun.misc.Unsafe.class.getMethod("invokeCleaner", java.nio.ByteBuffer.class);
maxStoreCapacity = capacity;
/**
* create a resource collector to release specified bytebuffer that backed
* by Java off-heap.
*/
m_bufcollector = new ResCollector<MemBufferHolder<SysMemAllocator>, ByteBuffer>(new ResReclaim<ByteBuffer>() {
@Override
public synchronized void reclaim(ContextWrapper<ByteBuffer> rctx) {
ByteBuffer mres = rctx.getRes();
Long sz = Long.valueOf(mres.capacity());
boolean cb_reclaimed = false;
if (null != m_bufferreclaimer) {
cb_reclaimed = m_bufferreclaimer.reclaim(mres, sz);
}
if (!cb_reclaimed) {
try {
buf_clean_method.invoke(m_unsafe, mres);
} catch (Exception e) {
throw new Error(e);
}
mres = null;
}
currentMemory.addAndGet(-sz);
}
});
/**
* create a resource collector to release specified chunk that backed by
* Java off-heap.
*/
m_chunkcollector = new ResCollector<MemChunkHolder<SysMemAllocator>, Long>(new ResReclaim<Long>() {
@Override
public synchronized void reclaim(ContextWrapper<Long> rctx) {
Long mres = rctx.getRes();
// System.out.println(String.format("Reclaim: %X ...", mres));
Long sz = m_chunksize.remove(mres);
boolean cb_reclaimed = false;
if (null != m_chunkreclaimer) {
cb_reclaimed = m_chunkreclaimer.reclaim(mres, sz);
}
if (!cb_reclaimed) {
m_unsafe.freeMemory(mres);
mres = null;
}
if (null != sz) {
currentMemory.addAndGet(-sz);
}
}
});
}
/**
* enable active garbage collection. the GC will be forced to collect garbages
* when there is no more space for current allocation request.
*
* @param timeout
* the timeout is used to yield for GC performing
*
* @return this allocator
*/
@Override
public SysMemAllocator enableActiveGC(long timeout) {
m_activegc = true;
m_gctimeout = timeout;
return this;
}
/**
* disable active garbage collection.
*
* @return this allocator
*/
@Override
public SysMemAllocator disableActiveGC() {
m_activegc = false;
return this;
}
/**
* release the memory pool and close it.
*
*/
@Override
public void close() {
super.close();
}
/**
* sync. dirty data to memory
*
* @param addr
* specify the address
*
* @param length
* specify the length
*
* @param autodetect
* detect the length of specified memory block
*
*/
@Override
public void syncToVolatileMemory(long addr, long length, boolean autodetect) {
throw new UnsupportedOperationException("SysMemAllocator doesn't support sync");
}
/**
* sync. a buffer to memory.
*
* @param mbuf specify a buffer to be sync.
*/
@Override
public void syncToVolatileMemory(MemBufferHolder<SysMemAllocator> mbuf) {
throw new UnsupportedOperationException("SysMemAllocator doesn't support sync");
}
/**
* sync. a chunk to memory.
*
* @param mchunk specify a chunk to be sync.
*/
@Override
public void syncToVolatileMemory(MemChunkHolder<SysMemAllocator> mchunk) {
throw new UnsupportedOperationException("SysMemAllocator doesn't support sync");
}
@Override
public void syncAll() {
throw new UnsupportedOperationException("SysMemAllocator doesn't support sync");
}
/**
* re-size a specified chunk on its backed memory pool.
*
* @param mholder
* the holder of memory chunk. it can be null.
*
* @param size
* specify a new size of memory chunk
*
* @return the resized memory chunk handler
*/
@Override
public MemChunkHolder<SysMemAllocator> resizeChunk(MemChunkHolder<SysMemAllocator> mholder, long size) {
MemChunkHolder<SysMemAllocator> ret = null;
boolean ac = null != mholder.getRefId();
if (size > 0) {
if (currentMemory.get() + size > maxStoreCapacity) {
if (m_activegc) {
m_chunkcollector.waitReclaimCoolDown(m_gctimeout);
}
}
if (currentMemory.get() + size <= maxStoreCapacity) {
Long addr = m_unsafe.reallocateMemory(mholder.get(), size);
if (0 != addr) {
mholder.clear();
mholder.destroy();
ret = new MemChunkHolder<SysMemAllocator>(this, addr, size);
if (ac) {
m_chunkcollector.register(ret);
}
}
}
}
return ret;
}
/**
* resize a specified buffer on its backed memory pool.
*
* @param mholder
* the holder of memory buffer. it can be null.
*
* @param size
* specify a new size of memory chunk
*
* @return the resized memory buffer handler
*
*/
@Override
public MemBufferHolder<SysMemAllocator> resizeBuffer(MemBufferHolder<SysMemAllocator> mholder, long size) {
MemBufferHolder<SysMemAllocator> ret = null;
boolean ac = null != mholder.getRefId();
if (size > 0) {
int bufpos = mholder.get().position();
int buflimit = mholder.get().limit();
if (currentMemory.get() + size > maxStoreCapacity) {
if (m_activegc) {
m_bufcollector.waitReclaimCoolDown(m_gctimeout);
}
}
if (currentMemory.get() + size <= maxStoreCapacity) {
ByteBuffer buf = Utils.resizeByteBuffer(mholder.get(), size);
if (null != buf) {
mholder.clear();
mholder.destroy();
buf.position(bufpos <= size ? bufpos : 0);
buf.limit(buflimit <= size ? buflimit : (int) size);
ret = new MemBufferHolder<SysMemAllocator>(this, buf);
if (ac) {
m_bufcollector.register(ret);
}
}
}
}
return ret;
}
/**
* create a memory chunk that is managed by its holder.
*
* @param size
* specify the size of memory chunk
*
* @param autoreclaim
* specify whether or not to reclaim this chunk automatically
*
* @param rctx
* specify a reclaim context
*
* @return a holder contains a memory chunk
*/
@Override
public MemChunkHolder<SysMemAllocator> createChunk(long size, boolean autoreclaim,
ReclaimContext rctx) {
MemChunkHolder<SysMemAllocator> ret = null;
Long addr = null;
if (currentMemory.get() + size > maxStoreCapacity) {
if (m_activegc) {
m_chunkcollector.waitReclaimCoolDown(m_gctimeout);
}
}
if (currentMemory.get() + size <= maxStoreCapacity) {
addr = m_unsafe.allocateMemory(size);
}
if (null != addr && 0 != addr) {
ret = new MemChunkHolder<SysMemAllocator>(this, addr, size);
ret.setCollector(m_chunkcollector);
if (autoreclaim) {
m_chunkcollector.register(ret, rctx);
}
m_chunksize.put(addr, size);
currentMemory.getAndAdd(size);
}
return ret;
}
/**
* create a memory buffer that is managed by its holder.
*
* @param size
* specify the size of memory buffer
*
* @param rctx
* specify a reclaim context
*
* @return a holder contains a memory buffer
*/
@Override
public MemBufferHolder<SysMemAllocator> createBuffer(long size, boolean autoreclaim,
ReclaimContext rctx) {
MemBufferHolder<SysMemAllocator> ret = null;
ByteBuffer bb = null;
if (currentMemory.get() + size > maxStoreCapacity) {
if (m_activegc) {
m_bufcollector.waitReclaimCoolDown(m_gctimeout);
}
}
if (currentMemory.get() + size <= maxStoreCapacity) {
bb = ByteBuffer.allocateDirect((int) size);
}
if (null != bb) {
ret = new MemBufferHolder<SysMemAllocator>(this, bb);
ret.setCollector(m_bufcollector);
if (autoreclaim) {
m_bufcollector.register(ret, rctx);
}
currentMemory.getAndAdd(size);
}
return ret;
}
@Override
public long expand(long size) {
long ret = 0L;
if (null != m_features) {
if (m_features.contains(EXPANDABLE)) {
ret = size;
} else {
throw new ConfigurationException("Do not support expand operation");
}
} else {
throw new ConfigurationException("Do not support features");
}
return ret;
}
@Override
public long shrink(long size) {
long ret = 0L;
if (null != m_features) {
if (m_features.contains(SHRINKABLE)) {
ret = size;
} else {
throw new ConfigurationException("Do not support shrink operation");
}
} else {
throw new ConfigurationException("Do not support features");
}
return ret;
}
}