Added pointer support for the Unsafe allocator and the UnsafeMemoryManagerServiceImpl
git-svn-id: https://svn.apache.org/repos/asf/directmemory/trunk@1402298 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
index 8e6ccd4..551c1a2 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
@@ -32,7 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MemoryManagerServiceImpl<V> extends AbstractMemoryManager<V>
+public class MemoryManagerServiceImpl<V>
+ extends AbstractMemoryManager<V>
implements MemoryManagerService<V>
{
@@ -75,7 +76,6 @@
logger.info( format( "MemoryManager initialized - %d buffers, %s each", numberOfBuffers, Ram.inMb( size ) ) );
}
-
protected Allocator instanciateByteBufferAllocator( final int allocatorNumber, final int size )
{
final MergingByteBufferAllocatorImpl allocator = new MergingByteBufferAllocatorImpl( allocatorNumber, size );
@@ -127,7 +127,7 @@
p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
- buffer.writerIndex(0);
+ buffer.writerIndex( 0 );
buffer.writeBytes( payload );
used.addAndGet( payload.length );
@@ -149,7 +149,7 @@
pointer.hit();
final MemoryBuffer buf = pointer.getMemoryBuffer();
- buf.readerIndex(0);
+ buf.readerIndex( 0 );
final byte[] swp = new byte[(int) buf.readableBytes()];
buf.readBytes( swp );
@@ -157,12 +157,12 @@
}
@Override
- public Pointer<V> free( final Pointer<V> pointer )
+ public Pointer<V> free( final Pointer<V> pointer )
{
if ( !pointers.remove( pointer ) )
{
// pointers has been already freed.
- //throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
+ // throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
return pointer;
}
@@ -171,7 +171,7 @@
used.addAndGet( -pointer.getCapacity() );
pointer.setFree( true );
-
+
return pointer;
}
@@ -186,7 +186,6 @@
return totalCapacity;
}
-
protected List<Allocator> getAllocators()
{
return allocators;
@@ -254,9 +253,8 @@
final long expires )
{
- Pointer<V> p = new PointerImpl<V>();
-
- p.setMemoryBuffer(buffer);
+ Pointer<V> p = new PointerImpl<V>( buffer );
+
p.setExpiration( expires, expiresIn );
p.setBufferNumber( allocatorIndex );
p.setFree( false );
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
index 6ac177c..b41328b 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
@@ -42,14 +42,8 @@
void setBufferNumber( int bufferNumber );
- long getStart();
-
- void setStart( long address );
-
long getSize();
- void setEnd( long l );
-
void hit();
Class<? extends T> getClazz();
@@ -58,14 +52,12 @@
MemoryBuffer getMemoryBuffer();
- void setMemoryBuffer(MemoryBuffer memoryBuffer);
-
void createdNow();
void setExpiration( long expires, long expiresIn );
-
+
long getExpires();
-
+
long getExpiresIn();
}
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
index 615bd5f..ee808f3 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
@@ -19,19 +19,16 @@
* under the License.
*/
-import org.apache.directmemory.memory.buffer.MemoryBuffer;
-
-import static java.lang.System.currentTimeMillis;
import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
-import java.nio.ByteBuffer;
+import org.apache.directmemory.memory.buffer.MemoryBuffer;
public class PointerImpl<T>
implements Pointer<T>
{
- public long start;
- public long size;
+ public final MemoryBuffer memoryBuffer;
public long created;
@@ -49,16 +46,9 @@
public Class<? extends T> clazz;
- public MemoryBuffer memoryBuffer = null;
-
- public PointerImpl()
+ public PointerImpl( MemoryBuffer memoryBuffer )
{
- }
-
- public PointerImpl( long start, long end )
- {
- this.start = start;
- this.size = end;
+ this.memoryBuffer = memoryBuffer;
}
@Override
@@ -76,16 +66,13 @@
@Override
public long getCapacity()
{
- if (memoryBuffer != null)
- return memoryBuffer == null ? size - start + 1 : memoryBuffer.capacity();
- else
- return size;
+ return memoryBuffer == null ? -1 : memoryBuffer.capacity();
}
@Override
public String toString()
{
- return format( "%s[%s, %s] %s free", getClass().getSimpleName(), start, size, ( free ? "" : "not" ) );
+ return format( "%s[%s] %s free", getClass().getSimpleName(), getSize(), ( free ? "" : "not" ) );
}
@Override
@@ -97,7 +84,7 @@
hits = 0;
expiresIn = 0;
clazz = null;
- memoryBuffer = null;
+ memoryBuffer.clear();
}
@Override
@@ -123,21 +110,9 @@
}
@Override
- public long getStart()
- {
- return start;
- }
-
- @Override
public long getSize()
{
- return size;
- }
-
- @Override
- public void setStart( long start )
- {
- this.start = start;
+ return memoryBuffer.capacity();
}
@Override
@@ -166,26 +141,12 @@
}
@Override
- public void setEnd( long end )
- {
- this.size = end;
- }
-
- @Override
public void setClazz( Class<? extends T> clazz )
{
this.clazz = clazz;
}
@Override
- public void setMemoryBuffer(MemoryBuffer memoryBuffer)
- {
- this.memoryBuffer = memoryBuffer;
- this.start = 0;
- this.size = memoryBuffer.capacity();
- }
-
- @Override
public void createdNow()
{
created = System.currentTimeMillis();
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
index afca45f..4e4a5df 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
@@ -5,15 +5,15 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.directmemory.memory.allocator.Allocator;
+import org.apache.directmemory.memory.allocator.FixedSizeUnsafeAllocatorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.misc.Unsafe;
-
-public class UnsafeMemoryManagerServiceImpl<V> extends AbstractMemoryManager<V>
- implements MemoryManagerService<V>
+public class UnsafeMemoryManagerServiceImpl<V>
+ extends AbstractMemoryManager<V>
+ implements MemoryManagerService<V>
{
protected static final long NEVER_EXPIRES = 0L;
@@ -22,32 +22,25 @@
private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer<V>, Boolean>() );
-// protected final AtomicLong used = new AtomicLong( 0L );
+ private Allocator allocator;
- protected Unsafe unsafe = null;
-
+ // protected final AtomicLong used = new AtomicLong( 0L );
+
private long capacity;
-
+
+ private int size;
+
@Override
public void init( int numberOfBuffers, int size )
{
this.capacity = numberOfBuffers * size;
- try
- {
- java.lang.reflect.Field field = Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- unsafe = (Unsafe)field.get(null);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ this.size = size;
+ this.allocator = new FixedSizeUnsafeAllocatorImpl( numberOfBuffers, size );
}
-
- protected Pointer<V> instanciatePointer( final long expiresIn ,final long expires)
- {
- Pointer<V> p = new PointerImpl<V>();
+ protected Pointer<V> instanciatePointer( final long expiresIn, final long expires )
+ {
+ Pointer<V> p = new PointerImpl<V>( allocator.allocate( size ) );
p.setExpiration( expires, expiresIn );
p.setFree( false );
@@ -61,7 +54,7 @@
@Override
public Pointer<V> store( byte[] payload, long expiresIn )
{
- if (capacity - used.get() - payload.length < 0)
+ if ( capacity - used.get() - payload.length < 0 )
{
if ( returnsNullWhenFull() )
{
@@ -72,54 +65,45 @@
throw new BufferOverflowException();
}
}
-
- Pointer<V> p = instanciatePointer(expiresIn, NEVER_EXPIRES );
- final long address = unsafe.allocateMemory( payload.length );
- for ( int i = 0; i < payload.length; i++ )
- {
- unsafe.putByte( address+i, payload[i] );
- }
- p.setStart( address );
- p.setEnd( payload.length );
-
+
+ Pointer<V> p = instanciatePointer( expiresIn, NEVER_EXPIRES );
+ p.getMemoryBuffer().writeBytes( payload );
+
used.addAndGet( payload.length );
// 2nd version
// unsafe.copyMemory( srcAddress, address, payload.length );
return p;
}
-// @Override
-// public Pointer<V> store( byte[] payload )
-// {
-// return store(payload, 0);
-// }
+ // @Override
+ // public Pointer<V> store( byte[] payload )
+ // {
+ // return store(payload, 0);
+ // }
-// @Override
-// public Pointer<V> update( Pointer<V> pointer, byte[] payload )
-// {
-// free(pointer);
-// return store(payload, pointer.getExpiresIn());
-// }
+ // @Override
+ // public Pointer<V> update( Pointer<V> pointer, byte[] payload )
+ // {
+ // free(pointer);
+ // return store(payload, pointer.getExpiresIn());
+ // }
@Override
public byte[] retrieve( Pointer<V> pointer )
{
final byte[] swp = new byte[(int) pointer.getSize()];
-
- for ( int i = 0; i < swp.length; i++ )
- {
- swp[i] = unsafe.getByte( i + pointer.getStart() );
- }
-
+
+ pointer.getMemoryBuffer().readBytes( swp );
+
return swp;
}
@Override
public Pointer<V> free( Pointer<V> pointer )
{
- unsafe.freeMemory( pointer.getStart() );
+ allocator.free( pointer.getMemoryBuffer() );
pointers.remove( pointer );
- used.set(used.get() - (pointer.getSize()));
+ used.set( used.get() - ( pointer.getSize() ) );
pointer.setFree( true );
return pointer;
}
@@ -129,8 +113,8 @@
{
for ( Iterator<Pointer<V>> iterator = pointers.iterator(); iterator.hasNext(); )
{
- Pointer<V> pointer = (Pointer<V>) iterator.next();
- free(pointer);
+ Pointer<V> pointer = iterator.next();
+ free( pointer );
}
}
@@ -139,33 +123,33 @@
{
return capacity;
}
-//
-// @Override
-// public long used()
-// {
-// // TODO Auto-generated method stub
-// return used.get();
-// }
-//
-// @Override
-// public long collectExpired()
-// {
-// // TODO Auto-generated method stub
-// return 0;
-// }
-//
-// @Override
-// public void collectLFU()
-// {
-// // TODO Auto-generated method stub
-//
-// }
-//
-// @Override
-// public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
-// {
-// // TODO Auto-generated method stub
-// return null;
-// }
+ //
+ // @Override
+ // public long used()
+ // {
+ // // TODO Auto-generated method stub
+ // return used.get();
+ // }
+ //
+ // @Override
+ // public long collectExpired()
+ // {
+ // // TODO Auto-generated method stub
+ // return 0;
+ // }
+ //
+ // @Override
+ // public void collectLFU()
+ // {
+ // // TODO Auto-generated method stub
+ //
+ // }
+ //
+ // @Override
+ // public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+ // {
+ // // TODO Auto-generated method stub
+ // return null;
+ // }
}
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
index b1da057..cc80b43 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
@@ -1,10 +1,11 @@
package org.apache.directmemory.memory.allocator;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteOrder;
import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.directmemory.memory.buffer.AbstractMemoryBuffer;
import org.apache.directmemory.memory.buffer.MemoryBuffer;
@@ -15,19 +16,33 @@
private final sun.misc.Unsafe unsafe = UnsafeUtils.getUnsafe();
- private final Set<UnsafeMemoryBuffer> memoryBuffers = new ConcurrentSkipListSet<UnsafeMemoryBuffer>();
+ private final Queue<UnsafeMemoryBuffer> memoryBuffers = new ConcurrentLinkedQueue<UnsafeMemoryBuffer>();
private final int number;
- public FixedSizeUnsafeAllocatorImpl( int number )
+ private final int size;
+
+ // Tells if it returns null or throw an BufferOverflowException when the requested size is bigger than the size of
+ // the slices
+ private final boolean returnNullWhenOversizingSliceSize = true;
+
+ public FixedSizeUnsafeAllocatorImpl( int number, int size )
{
this.number = number;
+ this.size = size;
if ( unsafe == null )
{
throw new IllegalStateException( "This JVM has no sun.misc.Unsafe support, "
+ "please choose another MemoryManager implementation" );
}
+
+ for ( int i = 0; i < number; i++ )
+ {
+ long baseAddress = unsafe.allocateMemory( size );
+ UnsafeMemoryBuffer memoryBuffer = new UnsafeMemoryBuffer( baseAddress, size );
+ memoryBuffers.add( memoryBuffer );
+ }
}
@Override
@@ -35,22 +50,26 @@
throws IOException
{
clear();
+ Iterator<UnsafeMemoryBuffer> iterator = memoryBuffers.iterator();
+ while ( iterator.hasNext() )
+ {
+ UnsafeMemoryBuffer memoryBuffer = iterator.next();
+ memoryBuffer.free();
+ iterator.remove();
+ }
}
@Override
public void free( MemoryBuffer memoryBuffer )
{
- memoryBuffer.free();
- memoryBuffers.remove( memoryBuffer );
+ memoryBuffer.clear();
+ memoryBuffers.offer( (UnsafeMemoryBuffer) memoryBuffer );
}
@Override
public MemoryBuffer allocate( int size )
{
- long baseAddress = unsafe.allocateMemory( size );
- UnsafeMemoryBuffer memoryBuffer = new UnsafeMemoryBuffer( baseAddress, size );
- memoryBuffers.add( memoryBuffer );
- return memoryBuffer;
+ return findFreeBuffer( size );
}
@Override
@@ -61,7 +80,6 @@
{
UnsafeMemoryBuffer memoryBuffer = iterator.next();
unsafe.setMemory( memoryBuffer.baseAddress, memoryBuffer.capacity, (byte) 0 );
- iterator.remove();
}
}
@@ -83,6 +101,24 @@
return number;
}
+ protected MemoryBuffer findFreeBuffer( int capacity )
+ {
+ // ensure the requested size is not bigger than the slices' size
+ if ( capacity > size )
+ {
+ if ( returnNullWhenOversizingSliceSize )
+ {
+ return null;
+ }
+ else
+ {
+ throw new BufferOverflowException();
+ }
+ }
+ // TODO : Add capacity to wait till a given timeout for a freed buffer
+ return memoryBuffers.poll();
+ }
+
private class UnsafeMemoryBuffer
extends AbstractMemoryBuffer
{
diff --git a/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java b/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
index 0cec1b6..3adc0a5 100644
--- a/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
+++ b/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
@@ -28,7 +28,6 @@
import java.util.zip.Checksum;
import org.apache.directmemory.measures.Ram;
-import org.apache.directmemory.memory.Pointer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,15 +44,16 @@
public class BaseTest
extends AbstractBenchmark
{
-
+
MemoryManagerService<Object> mem;
-
+
@Before
public void initMMS()
{
mem = new MemoryManagerServiceImpl<Object>();
mem.init( 1, 1 * 1024 * 1024 );
}
+
@Test
public void smokeTest()
{
@@ -62,7 +62,7 @@
Random rnd = new Random();
- int size = rnd.nextInt( 10 ) * (int)mem.capacity() / 100;
+ int size = rnd.nextInt( 10 ) * (int) mem.capacity() / 100;
logger.info( "size=" + size );
@@ -74,7 +74,6 @@
assertEquals( 0, mem.used() );
}
-
private static Logger logger = LoggerFactory.getLogger( MallocTest.class );
private static Random rnd = new Random();
@@ -97,8 +96,8 @@
public static void setup()
{
rnd = new Random();
-// logger.info("off-heap allocated: " + Ram.inMb(mem.capacity()));
-// logger.info("off-heap used: " + Ram.inMb(mem.used()));
+ // logger.info("off-heap allocated: " + Ram.inMb(mem.capacity()));
+ // logger.info("off-heap used: " + Ram.inMb(mem.used()));
logger.info( "test - size: " + test.size() );
logger.info( "test - errors: " + errors );
logger.info( "heap - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
@@ -165,11 +164,12 @@
{
byte[] payload = ( test + " - " + i ).getBytes();
Pointer<Object> p = mem.store( payload );
- logger.info( "p.start=" + p.getStart() );
+ // logger.info( "p.start=" + p.getStart() );
logger.info( "p.end=" + p.getSize() );
if ( lastP != null )
{
- assertEquals( lastP.getSize() + 1, p.getStart() );
+ // TODO CE: this check may not work for native memory access like sun.misc.Unsafe
+ // assertEquals( lastP.getSize() + 1, p.getStart() );
}
assertEquals( p.getCapacity(), payload.length );
lastP = p;