Added pointer support for the Unsafe allocator and the UnsafeMemoryManagerServiceImpl

git-svn-id: https://svn.apache.org/repos/asf/directmemory/trunk@1402298 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
index 8e6ccd4..551c1a2 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/MemoryManagerServiceImpl.java
@@ -32,7 +32,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MemoryManagerServiceImpl<V> extends AbstractMemoryManager<V>
+public class MemoryManagerServiceImpl<V>
+    extends AbstractMemoryManager<V>
     implements MemoryManagerService<V>
 {
 
@@ -75,7 +76,6 @@
         logger.info( format( "MemoryManager initialized - %d buffers, %s each", numberOfBuffers, Ram.inMb( size ) ) );
     }
 
-
     protected Allocator instanciateByteBufferAllocator( final int allocatorNumber, final int size )
     {
         final MergingByteBufferAllocatorImpl allocator = new MergingByteBufferAllocatorImpl( allocatorNumber, size );
@@ -127,7 +127,7 @@
 
             p = instanciatePointer( buffer, allocator.getNumber(), expiresIn, NEVER_EXPIRES );
 
-            buffer.writerIndex(0);
+            buffer.writerIndex( 0 );
             buffer.writeBytes( payload );
 
             used.addAndGet( payload.length );
@@ -149,7 +149,7 @@
         pointer.hit();
 
         final MemoryBuffer buf = pointer.getMemoryBuffer();
-        buf.readerIndex(0);
+        buf.readerIndex( 0 );
 
         final byte[] swp = new byte[(int) buf.readableBytes()];
         buf.readBytes( swp );
@@ -157,12 +157,12 @@
     }
 
     @Override
-    public Pointer<V>  free( final Pointer<V> pointer )
+    public Pointer<V> free( final Pointer<V> pointer )
     {
         if ( !pointers.remove( pointer ) )
         {
             // pointers has been already freed.
-            //throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
+            // throw new IllegalArgumentException( "This pointer " + pointer + " has already been freed" );
             return pointer;
         }
 
@@ -171,7 +171,7 @@
         used.addAndGet( -pointer.getCapacity() );
 
         pointer.setFree( true );
-        
+
         return pointer;
     }
 
@@ -186,7 +186,6 @@
         return totalCapacity;
     }
 
-  
     protected List<Allocator> getAllocators()
     {
         return allocators;
@@ -254,9 +253,8 @@
                                              final long expires )
     {
 
-        Pointer<V> p = new PointerImpl<V>();
-        
-        p.setMemoryBuffer(buffer);
+        Pointer<V> p = new PointerImpl<V>( buffer );
+
         p.setExpiration( expires, expiresIn );
         p.setBufferNumber( allocatorIndex );
         p.setFree( false );
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
index 6ac177c..b41328b 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/Pointer.java
@@ -42,14 +42,8 @@
 
     void setBufferNumber( int bufferNumber );
 
-    long getStart();
-
-    void setStart( long address );
-
     long getSize();
 
-    void setEnd( long l );
-
     void hit();
 
     Class<? extends T> getClazz();
@@ -58,14 +52,12 @@
 
     MemoryBuffer getMemoryBuffer();
 
-    void setMemoryBuffer(MemoryBuffer memoryBuffer);
-
     void createdNow();
 
     void setExpiration( long expires, long expiresIn );
-    
+
     long getExpires();
-    
+
     long getExpiresIn();
 
 }
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
index 615bd5f..ee808f3 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/PointerImpl.java
@@ -19,19 +19,16 @@
  * under the License.
  */
 
-import org.apache.directmemory.memory.buffer.MemoryBuffer;
-
-import static java.lang.System.currentTimeMillis;
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 
-import java.nio.ByteBuffer;
+import org.apache.directmemory.memory.buffer.MemoryBuffer;
 
 public class PointerImpl<T>
     implements Pointer<T>
 {
-    public long start;
 
-    public long size;
+    public final MemoryBuffer memoryBuffer;
 
     public long created;
 
@@ -49,16 +46,9 @@
 
     public Class<? extends T> clazz;
 
-    public MemoryBuffer memoryBuffer = null;
-
-    public PointerImpl()
+    public PointerImpl( MemoryBuffer memoryBuffer )
     {
-    }
-
-    public PointerImpl( long start, long end )
-    {
-        this.start = start;
-        this.size = end;
+        this.memoryBuffer = memoryBuffer;
     }
 
     @Override
@@ -76,16 +66,13 @@
     @Override
     public long getCapacity()
     {
-        if (memoryBuffer != null)
-            return memoryBuffer == null ? size - start + 1 : memoryBuffer.capacity();
-        else
-            return size;
+        return memoryBuffer == null ? -1 : memoryBuffer.capacity();
     }
 
     @Override
     public String toString()
     {
-        return format( "%s[%s, %s] %s free", getClass().getSimpleName(), start, size, ( free ? "" : "not" ) );
+        return format( "%s[%s] %s free", getClass().getSimpleName(), getSize(), ( free ? "" : "not" ) );
     }
 
     @Override
@@ -97,7 +84,7 @@
         hits = 0;
         expiresIn = 0;
         clazz = null;
-        memoryBuffer = null;
+        memoryBuffer.clear();
     }
 
     @Override
@@ -123,21 +110,9 @@
     }
 
     @Override
-    public long getStart()
-    {
-        return start;
-    }
-
-    @Override
     public long getSize()
     {
-        return size;
-    }
-
-    @Override
-    public void setStart( long start )
-    {
-        this.start = start;
+        return memoryBuffer.capacity();
     }
 
     @Override
@@ -166,26 +141,12 @@
     }
 
     @Override
-    public void setEnd( long end )
-    {
-        this.size = end;
-    }
-
-    @Override
     public void setClazz( Class<? extends T> clazz )
     {
         this.clazz = clazz;
     }
 
     @Override
-    public void setMemoryBuffer(MemoryBuffer memoryBuffer)
-    {
-        this.memoryBuffer = memoryBuffer;
-        this.start = 0;
-        this.size = memoryBuffer.capacity();
-    }
-
-    @Override
     public void createdNow()
     {
         created = System.currentTimeMillis();
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
index afca45f..4e4a5df 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/UnsafeMemoryManagerServiceImpl.java
@@ -5,15 +5,15 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.directmemory.memory.allocator.Allocator;
+import org.apache.directmemory.memory.allocator.FixedSizeUnsafeAllocatorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import sun.misc.Unsafe;
-
-public class UnsafeMemoryManagerServiceImpl<V> extends AbstractMemoryManager<V>
-    implements MemoryManagerService<V> 
+public class UnsafeMemoryManagerServiceImpl<V>
+    extends AbstractMemoryManager<V>
+    implements MemoryManagerService<V>
 {
 
     protected static final long NEVER_EXPIRES = 0L;
@@ -22,32 +22,25 @@
 
     private final Set<Pointer<V>> pointers = Collections.newSetFromMap( new ConcurrentHashMap<Pointer<V>, Boolean>() );
 
-//    protected final AtomicLong used = new AtomicLong( 0L );
+    private Allocator allocator;
 
-    protected Unsafe unsafe = null;
-    
+    // protected final AtomicLong used = new AtomicLong( 0L );
+
     private long capacity;
-    
+
+    private int size;
+
     @Override
     public void init( int numberOfBuffers, int size )
     {
         this.capacity = numberOfBuffers * size;
-        try
-        {
-            java.lang.reflect.Field field = Unsafe.class.getDeclaredField("theUnsafe");
-            field.setAccessible(true);
-            unsafe = (Unsafe)field.get(null);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        this.size = size;
+        this.allocator = new FixedSizeUnsafeAllocatorImpl( numberOfBuffers, size );
     }
-    
-    protected Pointer<V> instanciatePointer( final long expiresIn ,final long expires)
-    {
 
-        Pointer<V> p = new PointerImpl<V>();
+    protected Pointer<V> instanciatePointer( final long expiresIn, final long expires )
+    {
+        Pointer<V> p = new PointerImpl<V>( allocator.allocate( size ) );
 
         p.setExpiration( expires, expiresIn );
         p.setFree( false );
@@ -61,7 +54,7 @@
     @Override
     public Pointer<V> store( byte[] payload, long expiresIn )
     {
-        if (capacity - used.get() - payload.length < 0)
+        if ( capacity - used.get() - payload.length < 0 )
         {
             if ( returnsNullWhenFull() )
             {
@@ -72,54 +65,45 @@
                 throw new BufferOverflowException();
             }
         }
-        
-        Pointer<V> p = instanciatePointer(expiresIn, NEVER_EXPIRES );
-        final long address = unsafe.allocateMemory( payload.length );
-        for ( int i = 0; i < payload.length; i++ )
-        {
-            unsafe.putByte( address+i, payload[i] );
-        }
-        p.setStart( address );
-        p.setEnd( payload.length );
-        
+
+        Pointer<V> p = instanciatePointer( expiresIn, NEVER_EXPIRES );
+        p.getMemoryBuffer().writeBytes( payload );
+
         used.addAndGet( payload.length );
         // 2nd version
         // unsafe.copyMemory( srcAddress, address, payload.length );
         return p;
     }
 
-//    @Override
-//    public Pointer<V> store( byte[] payload )
-//    {
-//        return store(payload, 0);
-//    }
+    // @Override
+    // public Pointer<V> store( byte[] payload )
+    // {
+    // return store(payload, 0);
+    // }
 
-//    @Override
-//    public Pointer<V> update( Pointer<V> pointer, byte[] payload )
-//    {
-//        free(pointer);
-//        return store(payload, pointer.getExpiresIn());
-//    }
+    // @Override
+    // public Pointer<V> update( Pointer<V> pointer, byte[] payload )
+    // {
+    // free(pointer);
+    // return store(payload, pointer.getExpiresIn());
+    // }
 
     @Override
     public byte[] retrieve( Pointer<V> pointer )
     {
         final byte[] swp = new byte[(int) pointer.getSize()];
-        
-        for ( int i = 0; i < swp.length; i++ )
-        {
-            swp[i] = unsafe.getByte( i + pointer.getStart() );
-        }
-        
+
+        pointer.getMemoryBuffer().readBytes( swp );
+
         return swp;
     }
 
     @Override
     public Pointer<V> free( Pointer<V> pointer )
     {
-        unsafe.freeMemory( pointer.getStart() );
+        allocator.free( pointer.getMemoryBuffer() );
         pointers.remove( pointer );
-        used.set(used.get() - (pointer.getSize()));
+        used.set( used.get() - ( pointer.getSize() ) );
         pointer.setFree( true );
         return pointer;
     }
@@ -129,8 +113,8 @@
     {
         for ( Iterator<Pointer<V>> iterator = pointers.iterator(); iterator.hasNext(); )
         {
-            Pointer<V> pointer = (Pointer<V>) iterator.next();
-            free(pointer);
+            Pointer<V> pointer = iterator.next();
+            free( pointer );
         }
     }
 
@@ -139,33 +123,33 @@
     {
         return capacity;
     }
-//
-//    @Override
-//    public long used()
-//    {
-//        // TODO Auto-generated method stub
-//        return used.get();
-//    }
-//
-//    @Override
-//    public long collectExpired()
-//    {
-//        // TODO Auto-generated method stub
-//        return 0;
-//    }
-//
-//    @Override
-//    public void collectLFU()
-//    {
-//        // TODO Auto-generated method stub
-//
-//    }
-//
-//    @Override
-//    public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
-//    {
-//        // TODO Auto-generated method stub
-//        return null;
-//    }
+    //
+    // @Override
+    // public long used()
+    // {
+    // // TODO Auto-generated method stub
+    // return used.get();
+    // }
+    //
+    // @Override
+    // public long collectExpired()
+    // {
+    // // TODO Auto-generated method stub
+    // return 0;
+    // }
+    //
+    // @Override
+    // public void collectLFU()
+    // {
+    // // TODO Auto-generated method stub
+    //
+    // }
+    //
+    // @Override
+    // public <T extends V> Pointer<V> allocate( Class<T> type, int size, long expiresIn, long expires )
+    // {
+    // // TODO Auto-generated method stub
+    // return null;
+    // }
 
 }
diff --git a/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java b/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
index b1da057..cc80b43 100644
--- a/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
+++ b/directmemory-cache/src/main/java/org/apache/directmemory/memory/allocator/FixedSizeUnsafeAllocatorImpl.java
@@ -1,10 +1,11 @@
 package org.apache.directmemory.memory.allocator;
 
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteOrder;
 import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.directmemory.memory.buffer.AbstractMemoryBuffer;
 import org.apache.directmemory.memory.buffer.MemoryBuffer;
@@ -15,19 +16,33 @@
 
     private final sun.misc.Unsafe unsafe = UnsafeUtils.getUnsafe();
 
-    private final Set<UnsafeMemoryBuffer> memoryBuffers = new ConcurrentSkipListSet<UnsafeMemoryBuffer>();
+    private final Queue<UnsafeMemoryBuffer> memoryBuffers = new ConcurrentLinkedQueue<UnsafeMemoryBuffer>();
 
     private final int number;
 
-    public FixedSizeUnsafeAllocatorImpl( int number )
+    private final int size;
+
+    // Tells if it returns null or throw an BufferOverflowException when the requested size is bigger than the size of
+    // the slices
+    private final boolean returnNullWhenOversizingSliceSize = true;
+
+    public FixedSizeUnsafeAllocatorImpl( int number, int size )
     {
         this.number = number;
+        this.size = size;
 
         if ( unsafe == null )
         {
             throw new IllegalStateException( "This JVM has no sun.misc.Unsafe support, "
                 + "please choose another MemoryManager implementation" );
         }
+
+        for ( int i = 0; i < number; i++ )
+        {
+            long baseAddress = unsafe.allocateMemory( size );
+            UnsafeMemoryBuffer memoryBuffer = new UnsafeMemoryBuffer( baseAddress, size );
+            memoryBuffers.add( memoryBuffer );
+        }
     }
 
     @Override
@@ -35,22 +50,26 @@
         throws IOException
     {
         clear();
+        Iterator<UnsafeMemoryBuffer> iterator = memoryBuffers.iterator();
+        while ( iterator.hasNext() )
+        {
+            UnsafeMemoryBuffer memoryBuffer = iterator.next();
+            memoryBuffer.free();
+            iterator.remove();
+        }
     }
 
     @Override
     public void free( MemoryBuffer memoryBuffer )
     {
-        memoryBuffer.free();
-        memoryBuffers.remove( memoryBuffer );
+        memoryBuffer.clear();
+        memoryBuffers.offer( (UnsafeMemoryBuffer) memoryBuffer );
     }
 
     @Override
     public MemoryBuffer allocate( int size )
     {
-        long baseAddress = unsafe.allocateMemory( size );
-        UnsafeMemoryBuffer memoryBuffer = new UnsafeMemoryBuffer( baseAddress, size );
-        memoryBuffers.add( memoryBuffer );
-        return memoryBuffer;
+        return findFreeBuffer( size );
     }
 
     @Override
@@ -61,7 +80,6 @@
         {
             UnsafeMemoryBuffer memoryBuffer = iterator.next();
             unsafe.setMemory( memoryBuffer.baseAddress, memoryBuffer.capacity, (byte) 0 );
-            iterator.remove();
         }
     }
 
@@ -83,6 +101,24 @@
         return number;
     }
 
+    protected MemoryBuffer findFreeBuffer( int capacity )
+    {
+        // ensure the requested size is not bigger than the slices' size
+        if ( capacity > size )
+        {
+            if ( returnNullWhenOversizingSliceSize )
+            {
+                return null;
+            }
+            else
+            {
+                throw new BufferOverflowException();
+            }
+        }
+        // TODO : Add capacity to wait till a given timeout for a freed buffer
+        return memoryBuffers.poll();
+    }
+
     private class UnsafeMemoryBuffer
         extends AbstractMemoryBuffer
     {
diff --git a/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java b/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
index 0cec1b6..3adc0a5 100644
--- a/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
+++ b/directmemory-cache/src/test/java/org/apache/directmemory/memory/BaseTest.java
@@ -28,7 +28,6 @@
 import java.util.zip.Checksum;
 
 import org.apache.directmemory.measures.Ram;
-import org.apache.directmemory.memory.Pointer;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -45,15 +44,16 @@
 public class BaseTest
     extends AbstractBenchmark
 {
-    
+
     MemoryManagerService<Object> mem;
-    
+
     @Before
     public void initMMS()
     {
         mem = new MemoryManagerServiceImpl<Object>();
         mem.init( 1, 1 * 1024 * 1024 );
     }
+
     @Test
     public void smokeTest()
     {
@@ -62,7 +62,7 @@
 
         Random rnd = new Random();
 
-        int size = rnd.nextInt( 10 ) * (int)mem.capacity() / 100;
+        int size = rnd.nextInt( 10 ) * (int) mem.capacity() / 100;
 
         logger.info( "size=" + size );
 
@@ -74,7 +74,6 @@
         assertEquals( 0, mem.used() );
     }
 
-
     private static Logger logger = LoggerFactory.getLogger( MallocTest.class );
 
     private static Random rnd = new Random();
@@ -97,8 +96,8 @@
     public static void setup()
     {
         rnd = new Random();
-//		logger.info("off-heap allocated: " + Ram.inMb(mem.capacity()));
-//		logger.info("off-heap used:      " + Ram.inMb(mem.used()));
+        // logger.info("off-heap allocated: " + Ram.inMb(mem.capacity()));
+        // logger.info("off-heap used:      " + Ram.inMb(mem.used()));
         logger.info( "test - size: " + test.size() );
         logger.info( "test - errors: " + errors );
         logger.info( "heap - max: " + Ram.inMb( Runtime.getRuntime().maxMemory() ) );
@@ -165,11 +164,12 @@
         {
             byte[] payload = ( test + " - " + i ).getBytes();
             Pointer<Object> p = mem.store( payload );
-            logger.info( "p.start=" + p.getStart() );
+            // logger.info( "p.start=" + p.getStart() );
             logger.info( "p.end=" + p.getSize() );
             if ( lastP != null )
             {
-                assertEquals( lastP.getSize() + 1, p.getStart() );
+                // TODO CE: this check may not work for native memory access like sun.misc.Unsafe
+                // assertEquals( lastP.getSize() + 1, p.getStart() );
             }
             assertEquals( p.getCapacity(), payload.length );
             lastP = p;