QPID-8238: [Broker-J][AMQP 0-8..0-91] Optimize performance
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index e7d39ed..0351512 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -618,7 +618,8 @@
             }
 
             _receivedMessageCount.incrementAndGet();
-            _receivedMessageSize.addAndGet(message.getSizeIncludingHeader());
+            long sizeIncludingHeader = message.getSizeIncludingHeader();
+            _receivedMessageSize.addAndGet(sizeIncludingHeader);
 
             doRoute(message, routingAddress, instanceProperties, routingResult);
 
@@ -634,12 +635,12 @@
             if (routingResult.hasRoutes())
             {
                 _routedMessageCount.incrementAndGet();
-                _routedMessageSize.addAndGet(message.getSizeIncludingHeader());
+                _routedMessageSize.addAndGet(sizeIncludingHeader);
             }
             else
             {
                 _droppedMessageCount.incrementAndGet();
-                _droppedMessageSize.addAndGet(message.getSizeIncludingHeader());
+                _droppedMessageSize.addAndGet(sizeIncludingHeader);
             }
 
             return routingResult;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index a15bdc1..c958876 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -49,10 +49,12 @@
             return new Filterable()
             {
 
+                private final AMQMessageHeader _messageHeader = message.getMessageHeader();
+
                 @Override
                 public AMQMessageHeader getMessageHeader()
                 {
-                    return message.getMessageHeader();
+                    return _messageHeader;
                 }
 
                 @Override
@@ -88,49 +90,49 @@
                 @Override
                 public String getReplyTo()
                 {
-                    return message.getMessageHeader().getReplyTo();
+                    return _messageHeader.getReplyTo();
                 }
 
                 @Override
                 public String getType()
                 {
-                    return message.getMessageHeader().getType();
+                    return _messageHeader.getType();
                 }
 
                 @Override
                 public byte getPriority()
                 {
-                    return message.getMessageHeader().getPriority();
+                    return _messageHeader.getPriority();
                 }
 
                 @Override
                 public String getMessageId()
                 {
-                    return message.getMessageHeader().getMessageId();
+                    return _messageHeader.getMessageId();
                 }
 
                 @Override
                 public long getTimestamp()
                 {
-                    return message.getMessageHeader().getTimestamp();
+                    return _messageHeader.getTimestamp();
                 }
 
                 @Override
                 public String getCorrelationId()
                 {
-                    return message.getMessageHeader().getCorrelationId();
+                    return _messageHeader.getCorrelationId();
                 }
 
                 @Override
                 public long getExpiration()
                 {
-                    return message.getMessageHeader().getExpiration();
+                    return _messageHeader.getExpiration();
                 }
 
                 @Override
                 public Object getHeader(String name)
                 {
-                    return message.getMessageHeader().getHeader(name);
+                    return _messageHeader.getHeader(name);
                 }
 
                 @Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c618ccf..41b10ca 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -349,15 +349,11 @@
                 }
 
                 long bodySize = _currentMessage.getSize();
-                long timestamp = contentHeader.getProperties().getTimestamp();
-
                 try
                 {
 
-                    final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
-
                     final MessageMetaData messageMetaData =
-                            new MessageMetaData(messagePublishInfo,
+                            new MessageMetaData(info,
                                                 contentHeader,
                                                 getConnection().getLastReadTime());
 
@@ -381,8 +377,6 @@
                         _currentMessage = null;
 
 
-                        final boolean immediate = messagePublishInfo.isImmediate();
-
                         final InstanceProperties instanceProperties =
                                 new InstanceProperties()
                                 {
@@ -394,11 +388,11 @@
                                             case EXPIRATION:
                                                 return amqMessage.getExpiration();
                                             case IMMEDIATE:
-                                                return immediate;
+                                                return amqMessage.isImmediate();
                                             case PERSISTENT:
                                                 return amqMessage.isPersistent();
                                             case MANDATORY:
-                                                return messagePublishInfo.isMandatory();
+                                                return amqMessage.isMandatory();
                                             case REDELIVERED:
                                                 return false;
                                         }
@@ -411,7 +405,7 @@
                                                   amqMessage.getInitialRoutingAddress(),
                                                   instanceProperties);
 
-                        int enqueues = result.send(_transaction, immediate ? _immediateAction : null);
+                        int enqueues = result.send(_transaction, amqMessage.isImmediate() ? _immediateAction : null);
                         if (enqueues == 0)
                         {
                             boolean mandatory = amqMessage.isMandatory();
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index faab357..7c663ff 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -62,13 +62,10 @@
     public String getInitialRoutingAddress()
     {
         MessageMetaData messageMetaData = getMessageMetaData();
-        if (messageMetaData != null)
+        AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+        if (routingKey != null)
         {
-            AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
-            if (routingKey != null)
-            {
-                return routingKey.toString();
-            }
+            return routingKey.toString();
         }
         return "";
     }
@@ -115,7 +112,7 @@
     @Override
     public long getExpiration()
     {
-        return getMessageHeader().getExpiration();
+        return getMessageMetaData().getContentHeaderBody().getProperties().getExpiration();
     }
 
     @Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 1defedb..74b323b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -44,7 +44,7 @@
     private final ContentHeaderBody _contentHeaderBody;
 
 
-    private long _arrivalTime;
+    private final long _arrivalTime;
     private static final byte MANDATORY_FLAG = 1;
     private static final byte IMMEDIATE_FLAG = 2;
     public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
@@ -65,7 +65,7 @@
     }
 
 
-    public synchronized ContentHeaderBody getContentHeaderBody()
+    public ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;
     }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 43c26f1..2f1c6dd 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -346,8 +346,7 @@
 
     }
 
-    public synchronized void populatePropertiesFromBuffer(QpidByteBuffer buffer, int propertyFlags, int size) throws
-                                                                                                              AMQFrameDecodingException
+    public BasicContentHeaderProperties(QpidByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
     {
         _propertyFlags = propertyFlags;
 
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
index d7a70d4..6adbce5 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
@@ -179,8 +179,7 @@
         {
             throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
         }
-        properties = new BasicContentHeaderProperties();
-        properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
+        properties = new BasicContentHeaderProperties(buffer, propertyFlags, (int)(size-14));
 
         if(!methodProcessor.ignoreAllButCloseOk())
         {
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
index 6eb7f8c..4d2dce4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
@@ -43,13 +43,12 @@
         BasicContentHeaderProperties properties;
         if (classId == BasicConsumeBody.CLASS_ID)
         {
-        	properties = new BasicContentHeaderProperties();
+            properties = new BasicContentHeaderProperties(buffer, propertyFlags, size);
         }
         else
         {
-        	throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
+            throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
         }
-        properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
         return properties;
     }
 }
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
index 251b4bd..3436e7f 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.protocol.v0_8.transport;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -88,7 +91,8 @@
     public void testPopulatePropertiesFromBuffer() throws Exception
     {
         QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[300]);
-        _testProperties.populatePropertiesFromBuffer(buf, 99, 99);
+        _testProperties.dispose();
+        _testProperties = new BasicContentHeaderProperties(buf, 99, 99);
     }
 
     @Test
@@ -205,4 +209,57 @@
         assertEquals(clusterId, _testProperties.getClusterIdAsString());
     }
 
+    private static final int BUFFER_SIZE = 1024 * 10;
+    private static final int POOL_SIZE = 20;
+    private static final double SPARSITY_FRACTION = 0.5;
+
+    @Test
+    public void testRellocate() throws Exception
+    {
+        try
+        {
+            QpidByteBuffer.deinitialisePool();
+            QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
+            try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE))
+            {
+                // set some test fields
+                _testProperties.setContentType("text/plain");
+                _testProperties.setUserId("test");
+                final Map<String, Object> headers = FieldTable.convertToMap(_testProperties.getHeaders());
+                final int propertyListSize = _testProperties.getPropertyListSize();
+                final int flags = _testProperties.getPropertyFlags();
+
+                // write at the buffer end
+                final int pos = BUFFER_SIZE - propertyListSize * 2;
+                buffer.position(pos);
+
+                try (QpidByteBuffer propertiesBuffer = buffer.view(0, propertyListSize))
+                {
+                    _testProperties.writePropertyListPayload(propertiesBuffer);
+                    propertiesBuffer.flip();
+
+                    BasicContentHeaderProperties testProperties = new BasicContentHeaderProperties(propertiesBuffer, flags, propertyListSize);
+                    FieldTable headersBeforeReallocation = testProperties.getHeaders();
+                    assertEquals("Unexpected headers",
+                                 headers,
+                                 FieldTable.convertToMap(headersBeforeReallocation));
+
+                    buffer.dispose();
+
+                    assertTrue("Properties buffer should be sparse", propertiesBuffer.isSparse());
+                    testProperties.reallocate();
+
+                    FieldTable headersAfterReallocation = testProperties.getHeaders();
+
+                    assertEquals("Unexpected headers after re-allocation",
+                                 headers,
+                                 FieldTable.convertToMap(headersAfterReallocation));
+                }
+            }
+        }
+        finally
+        {
+            QpidByteBuffer.deinitialisePool();
+        }
+    }
 }