QPID-7752: Producer message flow control for AMQP 1.0 should not effect flow of transactions
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index fd19d73..abc82a9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -45,7 +45,6 @@
{
private final SectionDecoder _sectionDecoder;
private UnsignedInteger _lastDeliveryId;
- private ReceivingDestination _receivingDestination;
private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>();
private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>();
private boolean _creditWindow;
@@ -157,16 +156,6 @@
protected abstract Error messageTransfer(final Transfer transfer);
- public ReceivingDestination getReceivingDestination()
- {
- return _receivingDestination;
- }
-
- public void setDestination(final ReceivingDestination receivingDestination)
- {
- _receivingDestination = receivingDestination;
- }
-
@Override public void receiveFlow(final Flow flow)
{
setAvailable(flow.getAvailable());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 4ad15b8..9a39007 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1240,8 +1240,8 @@
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
- if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
- && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+ if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+ && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(true);
}
@@ -1280,8 +1280,8 @@
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
- if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
- && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+ if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+ && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
@@ -1311,7 +1311,7 @@
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
- if (linkEndpoint instanceof AbstractReceivingLinkEndpoint)
+ if (linkEndpoint instanceof StandardReceivingLinkEndpoint)
{
linkEndpoint.setStopped(true);
}
@@ -1343,8 +1343,8 @@
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
- if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
- && !_blockingEntities.contains(((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+ if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+ && !_blockingEntities.contains(((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
@@ -1631,10 +1631,9 @@
}
else
{
- if (endpoint.getRole() == Role.RECEIVER
+ if (endpoint instanceof StandardReceivingLinkEndpoint
&& (_blockingEntities.contains(Session_1_0.this)
- || (endpoint instanceof StandardReceivingLinkEndpoint
- && _blockingEntities.contains(((AbstractReceivingLinkEndpoint) endpoint).getReceivingDestination()))))
+ || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination())))
{
endpoint.setStopped(true);
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 339a167..53fedb9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -77,6 +77,7 @@
private boolean _resumedMessage;
private Binary _messageDeliveryTag;
private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
+ private ReceivingDestination _receivingDestination;
public StandardReceivingLinkEndpoint(final Session_1_0 session,
final Link_1_0<Source, Target> link)
@@ -494,6 +495,16 @@
_localUnsettled = new HashMap(_unsettledMap);
}
+ public ReceivingDestination getReceivingDestination()
+ {
+ return _receivingDestination;
+ }
+
+ public void setDestination(final ReceivingDestination receivingDestination)
+ {
+ _receivingDestination = receivingDestination;
+ }
+
@Override
protected void recoverLink(final Attach attach) throws AmqpErrorException
{