QPID-8212: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that consumer close does not delay concurrent connection close
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5d97a8b..ffb01d8 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3751,6 +3751,11 @@
         return super.isClosed() || _connection.isClosed();
     }
 
+    public boolean isSessionClosed()
+    {
+        return super.isClosed();
+    }
+
     /**
      * Checks if the Session and its parent connection are capable of performing
      * closing operations
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 9b5c6a8..3986bb3 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -545,7 +545,7 @@
     private boolean isClosedForInput(final int channelId)
     {
         AMQSession session;
-        return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isClosed());
+        return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isSessionClosed());
     }
 
 }
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
index ce62831..aec4316 100644
--- a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
@@ -20,19 +20,32 @@
  */
 package org.apache.qpid.systest.connection;
 
+import static org.apache.qpid.systest.core.util.Utils.INDEX;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.Enumeration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.QueueSession;
 import javax.jms.TopicSession;
 
@@ -49,6 +62,7 @@
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.systest.core.BrokerAdmin;
 import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.Utils;
 
 public class ConnectionTest extends JmsTestBase
 {
@@ -323,4 +337,89 @@
         assertNotNull("JMSXPropertyNames unexpectedly null", names);
         assertTrue("JMSXPropertyNames should have at least one name", names.hasMoreElements());
     }
+
+    @Test
+    public void testCloseWhenMessageListenerReceivesMessageAndClosesConsumer() throws Exception
+    {
+        final Connection connection =  getConnection();
+        try
+        {
+            final javax.jms.Session session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+            final Destination destination = session.createQueue(getTestQueueName());
+            final MessageConsumer consumer = session.createConsumer(destination);
+            Utils.sendMessages(session, destination, 2);
+
+            final CountDownLatch messageReceivingLatch = new CountDownLatch(1);
+            final CountDownLatch connectionCloseLatch = new CountDownLatch(1);
+            final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
+            consumer.setMessageListener(new MessageListener()
+            {
+                @Override
+                public void onMessage(final Message message)
+                {
+                    messageReceivingLatch.countDown();
+                    try
+                    {
+                        connectionCloseLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+                        consumer.close();
+                        // imitate slow message handling
+                        Thread.sleep(getReceiveTimeout());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                    catch (Exception e)
+                    {
+                        unexpectedException.set(e);
+                    }
+                }
+            });
+            connection.start();
+            messageReceivingLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+
+            final ExecutorService service = Executors.newSingleThreadExecutor();
+            try
+            {
+                Future closeFuture = service.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        connectionCloseLatch.countDown();
+                        connection.close();
+                        return null;
+                    }
+                });
+
+
+                closeFuture.get(getReceiveTimeout() * 4, TimeUnit.MILLISECONDS);
+            }
+            finally
+            {
+                service.shutdown();
+            }
+            assertNull("Unexpected acknowledge exception", unexpectedException.get());
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        final Connection connection2 =  getConnection();
+        try
+        {
+            final javax.jms.Session session = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+            final Destination destination = session.createQueue(getTestQueueName());
+            final MessageConsumer consumer = session.createConsumer(destination);
+            connection2.start();
+            final Message message = consumer.receive(getReceiveTimeout());
+            assertNotNull("Message is not received", message);
+            assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
 }