QPID-8212: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that consumer close does not delay concurrent connection close
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5d97a8b..ffb01d8 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3751,6 +3751,11 @@
return super.isClosed() || _connection.isClosed();
}
+ public boolean isSessionClosed()
+ {
+ return super.isClosed();
+ }
+
/**
* Checks if the Session and its parent connection are capable of performing
* closing operations
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 9b5c6a8..3986bb3 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -545,7 +545,7 @@
private boolean isClosedForInput(final int channelId)
{
AMQSession session;
- return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isClosed());
+ return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isSessionClosed());
}
}
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
index ce62831..aec4316 100644
--- a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionTest.java
@@ -20,19 +20,32 @@
*/
package org.apache.qpid.systest.connection;
+import static org.apache.qpid.systest.core.util.Utils.INDEX;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.Enumeration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.QueueSession;
import javax.jms.TopicSession;
@@ -49,6 +62,7 @@
import org.apache.qpid.jms.Session;
import org.apache.qpid.systest.core.BrokerAdmin;
import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.Utils;
public class ConnectionTest extends JmsTestBase
{
@@ -323,4 +337,89 @@
assertNotNull("JMSXPropertyNames unexpectedly null", names);
assertTrue("JMSXPropertyNames should have at least one name", names.hasMoreElements());
}
+
+ @Test
+ public void testCloseWhenMessageListenerReceivesMessageAndClosesConsumer() throws Exception
+ {
+ final Connection connection = getConnection();
+ try
+ {
+ final javax.jms.Session session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ final Destination destination = session.createQueue(getTestQueueName());
+ final MessageConsumer consumer = session.createConsumer(destination);
+ Utils.sendMessages(session, destination, 2);
+
+ final CountDownLatch messageReceivingLatch = new CountDownLatch(1);
+ final CountDownLatch connectionCloseLatch = new CountDownLatch(1);
+ final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
+ consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(final Message message)
+ {
+ messageReceivingLatch.countDown();
+ try
+ {
+ connectionCloseLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+ consumer.close();
+ // imitate slow message handling
+ Thread.sleep(getReceiveTimeout());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e)
+ {
+ unexpectedException.set(e);
+ }
+ }
+ });
+ connection.start();
+ messageReceivingLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+
+ final ExecutorService service = Executors.newSingleThreadExecutor();
+ try
+ {
+ Future closeFuture = service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ connectionCloseLatch.countDown();
+ connection.close();
+ return null;
+ }
+ });
+
+
+ closeFuture.get(getReceiveTimeout() * 4, TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ service.shutdown();
+ }
+ assertNull("Unexpected acknowledge exception", unexpectedException.get());
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ final Connection connection2 = getConnection();
+ try
+ {
+ final javax.jms.Session session = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ final Destination destination = session.createQueue(getTestQueueName());
+ final MessageConsumer consumer = session.createConsumer(destination);
+ connection2.start();
+ final Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message);
+ assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
}