QPID-7752: Producer message flow control for AMQP 1.0 should not effect flow of transactions
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index fd19d73..abc82a9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -45,7 +45,6 @@
 {
     private final SectionDecoder _sectionDecoder;
     private UnsignedInteger _lastDeliveryId;
-    private ReceivingDestination _receivingDestination;
     private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>();
     private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>();
     private boolean _creditWindow;
@@ -157,16 +156,6 @@
 
     protected abstract Error messageTransfer(final Transfer transfer);
 
-    public ReceivingDestination getReceivingDestination()
-    {
-        return _receivingDestination;
-    }
-
-    public void setDestination(final ReceivingDestination receivingDestination)
-    {
-        _receivingDestination = receivingDestination;
-    }
-
     @Override public void receiveFlow(final Flow flow)
     {
         setAvailable(flow.getAvailable());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 4ad15b8..9a39007 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1240,8 +1240,8 @@
 
             for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
             {
-                if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
-                    && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+                if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+                    && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
                 {
                     linkEndpoint.setStopped(true);
                 }
@@ -1280,8 +1280,8 @@
             }
             for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
             {
-                if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
-                        && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+                if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+                        && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
                 {
                     linkEndpoint.setStopped(false);
                 }
@@ -1311,7 +1311,7 @@
 
             for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
             {
-                if (linkEndpoint instanceof AbstractReceivingLinkEndpoint)
+                if (linkEndpoint instanceof StandardReceivingLinkEndpoint)
                 {
                     linkEndpoint.setStopped(true);
                 }
@@ -1343,8 +1343,8 @@
             }
             for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
             {
-                if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
-                    && !_blockingEntities.contains(((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
+                if (linkEndpoint instanceof StandardReceivingLinkEndpoint
+                    && !_blockingEntities.contains(((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
                 {
                     linkEndpoint.setStopped(false);
                 }
@@ -1631,10 +1631,9 @@
                     }
                     else
                     {
-                        if (endpoint.getRole() == Role.RECEIVER
+                        if (endpoint instanceof StandardReceivingLinkEndpoint
                             && (_blockingEntities.contains(Session_1_0.this)
-                                || (endpoint instanceof StandardReceivingLinkEndpoint
-                                    && _blockingEntities.contains(((AbstractReceivingLinkEndpoint) endpoint).getReceivingDestination()))))
+                                || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination())))
                         {
                             endpoint.setStopped(true);
                         }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 339a167..53fedb9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -77,6 +77,7 @@
     private boolean _resumedMessage;
     private Binary _messageDeliveryTag;
     private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
+    private ReceivingDestination _receivingDestination;
 
     public StandardReceivingLinkEndpoint(final Session_1_0 session,
                                          final Link_1_0<Source, Target> link)
@@ -494,6 +495,16 @@
         _localUnsettled = new HashMap(_unsettledMap);
     }
 
+    public ReceivingDestination getReceivingDestination()
+    {
+        return _receivingDestination;
+    }
+
+    public void setDestination(final ReceivingDestination receivingDestination)
+    {
+        _receivingDestination = receivingDestination;
+    }
+
     @Override
     protected void recoverLink(final Attach attach) throws AmqpErrorException
     {