blob: 5bbfff94e03738dbbf0ee84bf23b710d68fc0f01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.integration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.proton.amqp.DescribedType;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for expected behaviors of JMS Connection Consumer implementation.
*/
public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionConsumerIntegrationTest.class);
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
public void testCreateConnectionConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsServerSessionPool sessionPool = new JmsServerSessionPool();
Connection connection = testFixture.establishConnecton(testPeer);
// No additional Begin calls as there's no Session created for a Connection Consumer
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConnectionConsumerDispatchesToSessionConnectionStartedBeforeCreate() throws Exception {
doTestConnectionConsumerDispatchesToSession(true);
}
@Test(timeout = 20000)
public void testConnectionConsumerDispatchesToSessionConnectionStartedAfterCreate() throws Exception {
doTestConnectionConsumerDispatchesToSession(false);
}
private void doTestConnectionConsumerDispatchesToSession(boolean startBeforeCreate) throws Exception {
final CountDownLatch messageArrived = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
if (startBeforeCreate) {
connection.start();
}
testPeer.expectBegin();
// Create a session for our ServerSessionPool to use
Session session = connection.createSession();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messageArrived.countDown();
}
});
JmsServerSession serverSession = new JmsServerSession(session);
JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
if (!startBeforeCreate) {
connection.start();
}
assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPauseInOnMessageAndConsumerClosed() throws Exception {
final CountDownLatch messageArrived = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
// Create a session for our ServerSessionPool to use
Session session = connection.createSession();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messageArrived.countDown();
LOG.trace("Pausing onMessage to check for race on connection consumer close");
// Pause a bit to see if we race consumer close and our own
// message accept attempt by the delivering Session.
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
LOG.trace("Paused onMessage to check for race on connection consumer close");
}
});
JmsServerSession serverSession = new JmsServerSession(session);
JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
connection.start();
assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testNonStartedConnectionConsumerDoesNotDispatch() throws Exception {
final CountDownLatch messageArrived = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
// Create a session for our ServerSessionPool to use
Session session = connection.createSession();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messageArrived.countDown();
}
});
JmsServerSession serverSession = new JmsServerSession(session);
JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertFalse("Message Arrived unexpectedly", messageArrived.await(500, TimeUnit.MILLISECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testQueuedMessagesAreDrainedToServerSession() throws Exception {
final int MESSAGE_COUNT = 10;
final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
messagesDispatched.countDown();
}
});
testPeer.expectBegin();
// Create a session for our ServerSessionPool to use
Session session = connection.createSession();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messagesArrived.countDown();
}
});
JmsServerSession serverSession = new JmsServerSession(session);
JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
testPeer.expectDispositionThatIsAcceptedAndSettled();
}
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
connection.start();
assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConsumerRecoversAfterSessionPoolReturnsNullSession() throws Exception {
final int MESSAGE_COUNT = 10;
final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
messagesDispatched.countDown();
}
});
testPeer.expectBegin();
// Create a session for our ServerSessionPool to use
Session session = connection.createSession();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messagesArrived.countDown();
}
});
JmsServerSession serverSession = new JmsServerSession(session);
JmsServerSessionPoolFirstAttemptGetsNull sessionPool = new JmsServerSessionPoolFirstAttemptGetsNull(serverSession);
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
testPeer.expectDispositionThatIsAcceptedAndSettled();
}
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
connection.start();
assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testRemotelyCloseConnectionConsumer() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch connectionError = new CountDownLatch(1);
JmsServerSessionPool sessionPool = new JmsServerSessionPool();
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
connectionError.countDown();
}
});
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getServerSessionPool();
} catch (IllegalStateException jmsise) {
LOG.debug("Error reported from consumer.getServerSessionPool()", jmsise);
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
message.contains(BREAD_CRUMB);
} else {
return false;
}
}
return false;
}
}, 10000, 10));
assertTrue("Consumer closed callback didn't trigger", connectionError.await(5, TimeUnit.SECONDS));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
testPeer.expectClose();
connection.close();
}
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnServerSessionFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession());
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
//----- Internal ServerSessionPool ---------------------------------------//
private class JmsFailingServerSessionPool implements ServerSessionPool {
public JmsFailingServerSessionPool() {
}
@Override
public ServerSession getServerSession() throws JMSException {
throw new JMSException("Something is wrong with me");
}
}
private class JmsServerSessionPool implements ServerSessionPool {
private JmsServerSession serverSession;
public JmsServerSessionPool() {
this.serverSession = new JmsServerSession();
}
public JmsServerSessionPool(JmsServerSession serverSession) {
this.serverSession = serverSession;
}
@Override
public ServerSession getServerSession() throws JMSException {
return serverSession;
}
}
private class JmsServerSessionPoolFirstAttemptGetsNull implements ServerSessionPool {
private volatile boolean firstAttempt = true;
private JmsServerSession serverSession;
public JmsServerSessionPoolFirstAttemptGetsNull(JmsServerSession serverSession) {
this.serverSession = serverSession;
}
@Override
public ServerSession getServerSession() throws JMSException {
if (firstAttempt) {
firstAttempt = false;
return null;
} else {
return serverSession;
}
}
}
private class JmsServerSession implements ServerSession {
private final Session session;
private final ExecutorService runner = Executors.newSingleThreadExecutor();
public JmsServerSession() {
this.session = null;
}
public JmsServerSession(Session session) {
this.session = session;
}
@Override
public Session getSession() throws JMSException {
return session;
}
@Override
public void start() throws JMSException {
runner.execute(() -> {
session.run();
});
}
}
private class JmsFailingServerSession extends JmsServerSession {
public JmsFailingServerSession() {
}
@Override
public Session getSession() throws JMSException {
throw new JMSException("Something is wrong with me");
}
}
}