QPIDJMS-221 Ensure pending consumer requests are unblocked on close

When the consumer becomes closed or it's parent session or connection
does it should unblock any pending drain or stop requests.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 30e1cef..02d9b47 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -572,6 +572,18 @@
         JmsConsumerInfo consumerInfo = getResourceInfo();
 
         subTracker.consumerRemoved(consumerInfo);
+
+        // When closed we need to release any pending tasks to avoid blocking
+
+        if (stopRequest != null) {
+            stopRequest.onSuccess();
+            stopRequest = null;
+        }
+
+        if (pullRequest != null) {
+            pullRequest.onSuccess();
+            pullRequest = null;
+        }
     }
 
     //----- Inner classes used in message pull operations --------------------//
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index 29a1d46..24e133b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -25,7 +25,10 @@
 
 import java.util.Date;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -89,6 +92,9 @@
             assertTrue(m instanceof TextMessage);
             assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -123,6 +129,9 @@
             assertTrue(m instanceof TextMessage);
             assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -187,4 +196,80 @@
         }
     }
 
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(0);
+    }
+
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveTimedUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(1);
+    }
+
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveNoWaitUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(-1);
+    }
+
+    public void doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(final int timeout) throws Exception {
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Expect that once receive is called, it drains with 1 credit, don't answer it
+            if (timeout < 0) {
+                testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE));
+            } else {
+                testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
+            }
+
+            final AtomicBoolean error = new AtomicBoolean(false);
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        if (timeout < 0) {
+                            consumer.receiveNoWait();
+                        } else if (timeout == 0) {
+                            consumer.receive();
+                        } else {
+                            consumer.receive(10000);
+                        }
+                    } catch (Exception ex) {
+                        error.set(true);
+                    } finally {
+                        done.countDown();
+                    }
+                }
+            });
+
+            testPeer.waitForAllHandlersToComplete(3000);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            session.close();
+
+            assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS));
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }