QPID-7465: [Java Broker] Free memory in MessageMetaData_0_10 when it is no longer needed
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 0e4d494..b308435 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -31,10 +35,6 @@
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Struct;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
public class MessageMetaData_0_10 implements StorableMessageMetaData
{
private Header _header;
@@ -83,7 +83,7 @@
return TYPE;
}
- public int getStorableSize()
+ public synchronized int getStorableSize()
{
QpidByteBuffer buf = _encoded;
@@ -93,13 +93,12 @@
_encoded = buf;
}
- //TODO -- need to add stuff
return buf.limit();
}
private QpidByteBuffer encodeAsBuffer()
{
- ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE);
+ ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
encoder.writeInt64(_arrivalTime);
encoder.writeInt32(_bodySize);
@@ -141,28 +140,18 @@
return buf;
}
- public int writeToBuffer(QpidByteBuffer dest)
+ public synchronized int writeToBuffer(QpidByteBuffer dest)
{
- QpidByteBuffer buf = _encoded;
-
- if(buf == null)
+ if (_encoded == null)
{
- buf = encodeAsBuffer();
- _encoded = buf;
+ _encoded = encodeAsBuffer();
}
-
- buf = buf.duplicate();
-
- buf.position(0);
-
- if(dest.remaining() < buf.limit())
- {
- buf.limit(dest.remaining());
- }
- dest.put(buf);
- final int length = buf.limit();
- buf.dispose();
- return length;
+ dest.put(_encoded);
+ final int bytesWritten = _encoded.limit();
+ // We have special knowledge that we no longer need the encoded form after this call
+ // to reduce memory usage associated with the metadata free the encoded form here (QPID-7465)
+ clearEncodedForm();
+ return bytesWritten;
}
public int getContentSize()
@@ -178,13 +167,17 @@
@Override
public void dispose()
{
-
+ clearEncodedForm();
}
@Override
- public void clearEncodedForm()
+ public synchronized void clearEncodedForm()
{
-
+ if (_encoded != null)
+ {
+ _encoded.dispose();
+ _encoded = null;
+ }
}
public String getRoutingKey()
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
index bd42b3f..655eafe 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -30,21 +30,23 @@
public final class ServerEncoder extends AbstractEncoder
{
- public static final int DEFAULT_CAPACITY = 8192;
+ public static final int DEFAULT_CAPACITY = 256 * 1024;
+ private final boolean _useDirectMemory;
private final int _threshold;
private QpidByteBuffer _out;
private int _initialCapacity;
public ServerEncoder()
{
- this(DEFAULT_CAPACITY);
+ this(QpidByteBuffer.getPooledBufferSize(), true);
}
- public ServerEncoder(int capacity)
+ public ServerEncoder(int capacity, boolean useDirectMemory)
{
- _initialCapacity = capacity;
- _threshold = capacity/16;
- _out = QpidByteBuffer.allocateDirect(capacity);
+ _useDirectMemory = useDirectMemory;
+ _initialCapacity = (capacity > 0 ? capacity : DEFAULT_CAPACITY);
+ _threshold = Math.min(_initialCapacity/16, 256);
+ _out = QpidByteBuffer.allocate(useDirectMemory, _initialCapacity);
}
public void init()
@@ -52,7 +54,7 @@
if(_out.capacity() < _threshold)
{
_out.dispose();
- _out = QpidByteBuffer.allocateDirect(_initialCapacity);
+ _out = QpidByteBuffer.allocate(_useDirectMemory, _initialCapacity);
}
else
{
@@ -81,7 +83,7 @@
{
QpidByteBuffer old = _out;
int capacity = old.capacity();
- _out = QpidByteBuffer.allocateDirect(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+ _out = QpidByteBuffer.allocate(_useDirectMemory, Math.max(Math.max(capacity + size, 2 * capacity), _initialCapacity));
old.flip();
_out.put(old);
old.dispose();
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 17f5abd..eec2a05 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -804,6 +804,11 @@
_isPoolInitialized = true;
}
+ public static int getPooledBufferSize()
+ {
+ return _pooledBufferSize;
+ }
+
private static final class BufferInputStream extends InputStream
{
private final QpidByteBuffer _qpidByteBuffer;