QPID-8238: [Broker-J][AMQP 0-8..0-91] Optimize performance
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index e7d39ed..0351512 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -618,7 +618,8 @@
}
_receivedMessageCount.incrementAndGet();
- _receivedMessageSize.addAndGet(message.getSizeIncludingHeader());
+ long sizeIncludingHeader = message.getSizeIncludingHeader();
+ _receivedMessageSize.addAndGet(sizeIncludingHeader);
doRoute(message, routingAddress, instanceProperties, routingResult);
@@ -634,12 +635,12 @@
if (routingResult.hasRoutes())
{
_routedMessageCount.incrementAndGet();
- _routedMessageSize.addAndGet(message.getSizeIncludingHeader());
+ _routedMessageSize.addAndGet(sizeIncludingHeader);
}
else
{
_droppedMessageCount.incrementAndGet();
- _droppedMessageSize.addAndGet(message.getSizeIncludingHeader());
+ _droppedMessageSize.addAndGet(sizeIncludingHeader);
}
return routingResult;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index a15bdc1..c958876 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -49,10 +49,12 @@
return new Filterable()
{
+ private final AMQMessageHeader _messageHeader = message.getMessageHeader();
+
@Override
public AMQMessageHeader getMessageHeader()
{
- return message.getMessageHeader();
+ return _messageHeader;
}
@Override
@@ -88,49 +90,49 @@
@Override
public String getReplyTo()
{
- return message.getMessageHeader().getReplyTo();
+ return _messageHeader.getReplyTo();
}
@Override
public String getType()
{
- return message.getMessageHeader().getType();
+ return _messageHeader.getType();
}
@Override
public byte getPriority()
{
- return message.getMessageHeader().getPriority();
+ return _messageHeader.getPriority();
}
@Override
public String getMessageId()
{
- return message.getMessageHeader().getMessageId();
+ return _messageHeader.getMessageId();
}
@Override
public long getTimestamp()
{
- return message.getMessageHeader().getTimestamp();
+ return _messageHeader.getTimestamp();
}
@Override
public String getCorrelationId()
{
- return message.getMessageHeader().getCorrelationId();
+ return _messageHeader.getCorrelationId();
}
@Override
public long getExpiration()
{
- return message.getMessageHeader().getExpiration();
+ return _messageHeader.getExpiration();
}
@Override
public Object getHeader(String name)
{
- return message.getMessageHeader().getHeader(name);
+ return _messageHeader.getHeader(name);
}
@Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c618ccf..41b10ca 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -349,15 +349,11 @@
}
long bodySize = _currentMessage.getSize();
- long timestamp = contentHeader.getProperties().getTimestamp();
-
try
{
- final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
-
final MessageMetaData messageMetaData =
- new MessageMetaData(messagePublishInfo,
+ new MessageMetaData(info,
contentHeader,
getConnection().getLastReadTime());
@@ -381,8 +377,6 @@
_currentMessage = null;
- final boolean immediate = messagePublishInfo.isImmediate();
-
final InstanceProperties instanceProperties =
new InstanceProperties()
{
@@ -394,11 +388,11 @@
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
- return immediate;
+ return amqMessage.isImmediate();
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
- return messagePublishInfo.isMandatory();
+ return amqMessage.isMandatory();
case REDELIVERED:
return false;
}
@@ -411,7 +405,7 @@
amqMessage.getInitialRoutingAddress(),
instanceProperties);
- int enqueues = result.send(_transaction, immediate ? _immediateAction : null);
+ int enqueues = result.send(_transaction, amqMessage.isImmediate() ? _immediateAction : null);
if (enqueues == 0)
{
boolean mandatory = amqMessage.isMandatory();
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index faab357..7c663ff 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -62,13 +62,10 @@
public String getInitialRoutingAddress()
{
MessageMetaData messageMetaData = getMessageMetaData();
- if (messageMetaData != null)
+ AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+ if (routingKey != null)
{
- AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
- if (routingKey != null)
- {
- return routingKey.toString();
- }
+ return routingKey.toString();
}
return "";
}
@@ -115,7 +112,7 @@
@Override
public long getExpiration()
{
- return getMessageHeader().getExpiration();
+ return getMessageMetaData().getContentHeaderBody().getProperties().getExpiration();
}
@Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 1defedb..74b323b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -44,7 +44,7 @@
private final ContentHeaderBody _contentHeaderBody;
- private long _arrivalTime;
+ private final long _arrivalTime;
private static final byte MANDATORY_FLAG = 1;
private static final byte IMMEDIATE_FLAG = 2;
public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
@@ -65,7 +65,7 @@
}
- public synchronized ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 43c26f1..2f1c6dd 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -346,8 +346,7 @@
}
- public synchronized void populatePropertiesFromBuffer(QpidByteBuffer buffer, int propertyFlags, int size) throws
- AMQFrameDecodingException
+ public BasicContentHeaderProperties(QpidByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
{
_propertyFlags = propertyFlags;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
index d7a70d4..6adbce5 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
@@ -179,8 +179,7 @@
{
throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
}
- properties = new BasicContentHeaderProperties();
- properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
+ properties = new BasicContentHeaderProperties(buffer, propertyFlags, (int)(size-14));
if(!methodProcessor.ignoreAllButCloseOk())
{
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
index 6eb7f8c..4d2dce4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderPropertiesFactory.java
@@ -43,13 +43,12 @@
BasicContentHeaderProperties properties;
if (classId == BasicConsumeBody.CLASS_ID)
{
- properties = new BasicContentHeaderProperties();
+ properties = new BasicContentHeaderProperties(buffer, propertyFlags, size);
}
else
{
- throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
+ throw new AMQFrameDecodingException("Unsupported content header class id: " + classId, null);
}
- properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
return properties;
}
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
index 251b4bd..3436e7f 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderPropertiesTest.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8.transport;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
@@ -88,7 +91,8 @@
public void testPopulatePropertiesFromBuffer() throws Exception
{
QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[300]);
- _testProperties.populatePropertiesFromBuffer(buf, 99, 99);
+ _testProperties.dispose();
+ _testProperties = new BasicContentHeaderProperties(buf, 99, 99);
}
@Test
@@ -205,4 +209,57 @@
assertEquals(clusterId, _testProperties.getClusterIdAsString());
}
+ private static final int BUFFER_SIZE = 1024 * 10;
+ private static final int POOL_SIZE = 20;
+ private static final double SPARSITY_FRACTION = 0.5;
+
+ @Test
+ public void testRellocate() throws Exception
+ {
+ try
+ {
+ QpidByteBuffer.deinitialisePool();
+ QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
+ try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE))
+ {
+ // set some test fields
+ _testProperties.setContentType("text/plain");
+ _testProperties.setUserId("test");
+ final Map<String, Object> headers = FieldTable.convertToMap(_testProperties.getHeaders());
+ final int propertyListSize = _testProperties.getPropertyListSize();
+ final int flags = _testProperties.getPropertyFlags();
+
+ // write at the buffer end
+ final int pos = BUFFER_SIZE - propertyListSize * 2;
+ buffer.position(pos);
+
+ try (QpidByteBuffer propertiesBuffer = buffer.view(0, propertyListSize))
+ {
+ _testProperties.writePropertyListPayload(propertiesBuffer);
+ propertiesBuffer.flip();
+
+ BasicContentHeaderProperties testProperties = new BasicContentHeaderProperties(propertiesBuffer, flags, propertyListSize);
+ FieldTable headersBeforeReallocation = testProperties.getHeaders();
+ assertEquals("Unexpected headers",
+ headers,
+ FieldTable.convertToMap(headersBeforeReallocation));
+
+ buffer.dispose();
+
+ assertTrue("Properties buffer should be sparse", propertiesBuffer.isSparse());
+ testProperties.reallocate();
+
+ FieldTable headersAfterReallocation = testProperties.getHeaders();
+
+ assertEquals("Unexpected headers after re-allocation",
+ headers,
+ FieldTable.convertToMap(headersAfterReallocation));
+ }
+ }
+ }
+ finally
+ {
+ QpidByteBuffer.deinitialisePool();
+ }
+ }
}