QPID-7465: [Java Broker] Free memory in MessageMetaData_0_10 when it is no longer needed
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 0e4d494..b308435 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -20,6 +20,10 @@
 */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -31,10 +35,6 @@
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Struct;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 public class MessageMetaData_0_10 implements StorableMessageMetaData
 {
     private Header _header;
@@ -83,7 +83,7 @@
         return TYPE;
     }
 
-    public int getStorableSize()
+    public synchronized int getStorableSize()
     {
         QpidByteBuffer buf = _encoded;
 
@@ -93,13 +93,12 @@
             _encoded = buf;
         }
 
-        //TODO -- need to add stuff
         return buf.limit();
     }
 
     private QpidByteBuffer encodeAsBuffer()
     {
-        ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE);
+        ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
 
         encoder.writeInt64(_arrivalTime);
         encoder.writeInt32(_bodySize);
@@ -141,28 +140,18 @@
         return buf;
     }
 
-    public int writeToBuffer(QpidByteBuffer dest)
+    public synchronized int writeToBuffer(QpidByteBuffer dest)
     {
-        QpidByteBuffer buf = _encoded;
-
-        if(buf == null)
+        if (_encoded == null)
         {
-            buf = encodeAsBuffer();
-            _encoded = buf;
+            _encoded = encodeAsBuffer();
         }
-
-        buf = buf.duplicate();
-
-        buf.position(0);
-
-        if(dest.remaining() < buf.limit())
-        {
-            buf.limit(dest.remaining());
-        }
-        dest.put(buf);
-        final int length = buf.limit();
-        buf.dispose();
-        return length;
+        dest.put(_encoded);
+        final int bytesWritten = _encoded.limit();
+        // We have special knowledge that we no longer need the encoded form after this call
+        // to reduce memory usage associated with the metadata free the encoded form here (QPID-7465)
+        clearEncodedForm();
+        return bytesWritten;
     }
 
     public int getContentSize()
@@ -178,13 +167,17 @@
     @Override
     public void dispose()
     {
-
+        clearEncodedForm();
     }
 
     @Override
-    public void clearEncodedForm()
+    public synchronized void clearEncodedForm()
     {
-
+        if (_encoded != null)
+        {
+            _encoded.dispose();
+            _encoded = null;
+        }
     }
 
     public String getRoutingKey()
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
index bd42b3f..655eafe 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -30,21 +30,23 @@
 
 public final class ServerEncoder extends AbstractEncoder
 {
-    public static final int DEFAULT_CAPACITY = 8192;
+    public static final int DEFAULT_CAPACITY = 256 * 1024;
+    private final boolean _useDirectMemory;
     private final int _threshold;
     private QpidByteBuffer _out;
     private int _initialCapacity;
 
     public ServerEncoder()
     {
-        this(DEFAULT_CAPACITY);
+        this(QpidByteBuffer.getPooledBufferSize(), true);
     }
 
-    public ServerEncoder(int capacity)
+    public ServerEncoder(int capacity, boolean useDirectMemory)
     {
-        _initialCapacity = capacity;
-        _threshold = capacity/16;
-        _out = QpidByteBuffer.allocateDirect(capacity);
+        _useDirectMemory = useDirectMemory;
+        _initialCapacity = (capacity > 0 ? capacity : DEFAULT_CAPACITY);
+        _threshold = Math.min(_initialCapacity/16, 256);
+        _out = QpidByteBuffer.allocate(useDirectMemory, _initialCapacity);
     }
 
     public void init()
@@ -52,7 +54,7 @@
         if(_out.capacity() < _threshold)
         {
             _out.dispose();
-            _out = QpidByteBuffer.allocateDirect(_initialCapacity);
+            _out = QpidByteBuffer.allocate(_useDirectMemory, _initialCapacity);
         }
         else
         {
@@ -81,7 +83,7 @@
     {
         QpidByteBuffer old = _out;
         int capacity = old.capacity();
-        _out = QpidByteBuffer.allocateDirect(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+        _out = QpidByteBuffer.allocate(_useDirectMemory, Math.max(Math.max(capacity + size, 2 * capacity), _initialCapacity));
         old.flip();
         _out.put(old);
         old.dispose();
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 17f5abd..eec2a05 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -804,6 +804,11 @@
         _isPoolInitialized = true;
     }
 
+    public static int getPooledBufferSize()
+    {
+        return _pooledBufferSize;
+    }
+
     private static final class BufferInputStream extends InputStream
     {
         private final QpidByteBuffer _qpidByteBuffer;