QPID-6028: [Java Broker] ManagementNodeConsumer must take a message reference whilst it retains responsibility for the message - fixes NPE exposed by r1776037
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 9e787b2..a567235 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -89,7 +89,9 @@
if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- return new MessageContainer(managementResponse, null, false);
+ return new MessageContainer(managementResponse,
+ managementResponse.getMessageReference(),
+ false);
}
}
return null;
@@ -197,7 +199,7 @@
return _managementNode;
}
- void send(ManagementResponse responseEntry)
+ private void send(ManagementResponse responseEntry)
{
_queue.add(responseEntry);
_target.notifyWork();
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 8a1ebf6..bc1bd81 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -24,6 +24,7 @@
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -35,16 +36,19 @@
class ManagementResponse implements MessageInstance
{
private final ManagementNodeConsumer _consumer;
+ private final MessageReference _messageReference;
private int _deliveryCount;
private boolean _isRedelivered;
private boolean _isDelivered;
private boolean _isDeleted;
private InternalMessage _message;
- ManagementResponse(final ManagementNodeConsumer consumer, final InternalMessage message)
+ ManagementResponse(final ManagementNodeConsumer consumer,
+ final InternalMessage message)
{
_consumer = consumer;
_message = message;
+ _messageReference = _message.newReference(consumer);
}
@Override
@@ -253,4 +257,9 @@
{
return _consumer.getManagementNode();
}
+
+ public MessageReference getMessageReference()
+ {
+ return _messageReference;
+ }
}