blob: c48961d88b665e3c5d17e7206e9de87b9484db51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
public void testCloseConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCloseConsumerTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
testPeer.expectBegin();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.expectDetach(true, false, true);
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
try {
consumer.close();
fail("Should have thrown a timed out exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught excpected exception", jmsEx);
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testRemotelyCloseConsumer() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final AtomicBoolean consumerClosed = new AtomicBoolean();
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
consumerClosed.set(true);
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
message.contains(BREAD_CRUMB);
} else {
return false;
}
}
return false;
}
}, 10000, 10));
assertTrue("Consumer closed callback didn't trigger", consumerClosed.get());
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
}
}
/**
* Test that a message is received when calling receive with a timeout
* of 0, which means wait indefinitely.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceiveMessageWithReceiveZeroTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(0);
assertNotNull("A message should have been recieved", receivedMessage);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Test that an Ack is not dropped when RTE is thrown from onMessage
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testRuntimeExceptionInOnMessageReleasesInAutoAckMode() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsReleasedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
throw new RuntimeException();
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Test that an Ack is not dropped when RTE is thrown from onMessage
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testRuntimeExceptionInOnMessageReleasesInDupsOkAckMode() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsReleasedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
throw new RuntimeException();
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testCloseDurableTopicSubscriberDetachesWithCloseFalse() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
testPeer.expectDetach(false, true, false);
subscriber.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String topicName = "myTopic";
String subscriptionName = "mySubscription";
Topic topic = session.createTopic(topicName);
int messageCount = 5;
// Create a consumer and fill the prefetch with some messages,
// which we will consume some of but ack none of.
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName);
int consumeCount = 2;
for (int i = 1; i <= consumeCount; i++) {
Message receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
testPeer.expectDetach(false, true, false);
// Expect the messages that were not acked to be to be either
// modified or released depending on whether the app saw them
for (int i = 1; i <= consumeCount; i++) {
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
}
for (int i = consumeCount + 1 ; i <= messageCount; i++) {
testPeer.expectDisposition(true, new ReleasedMatcher());
}
testPeer.expectEnd();
consumer.close();
session.close();
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectClose();
connection.close();
}
}
@Test(timeout=20000)
public void testConsumerReceiveThrowsIfConnectionLost() throws Exception {
doConsumerReceiveThrowsIfConnectionLostTestImpl(false);
}
@Test(timeout=20000)
public void testConsumerTimedReceiveThrowsIfConnectionLost() throws Exception {
doConsumerReceiveThrowsIfConnectionLostTestImpl(true);
}
private void doConsumerReceiveThrowsIfConnectionLostTestImpl(boolean useTimeout) throws JMSException, Exception, IOException {
final CountDownLatch consumerReady = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.runAfterLastHandler(new AmqpPeerRunnable() {
@Override
public void run() {
try {
consumerReady.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting");
Thread.currentThread().interrupt();
}
}
});
testPeer.dropAfterLastHandler(10);
final MessageConsumer consumer = session.createConsumer(queue);
consumerReady.countDown();
try {
if (useTimeout) {
consumer.receive(100000);
} else {
consumer.receive();
}
fail("An exception should have been thrown");
} catch (JMSException jmse) {
// Expected
}
}
}
@Test(timeout=20000)
public void testConsumerReceiveNoWaitThrowsIfConnectionLost() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow(false, notNullValue(UnsignedInteger.class));
testPeer.expectLinkFlow(true, notNullValue(UnsignedInteger.class));
testPeer.dropAfterLastHandler();
final MessageConsumer consumer = session.createConsumer(queue);
try {
consumer.receiveNoWait();
fail("An exception should have been thrown");
} catch (JMSException jmse) {
// Expected
}
}
}
@Test(timeout=20000)
public void testSetMessageListenerAfterStartAndSend() throws Exception {
final int messageCount = 4;
final CountDownLatch latch = new CountDownLatch(messageCount);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < messageCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
LOG.debug("Async consumer got Message: {}", m);
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch incoming = new CountDownLatch(1);
Connection connection = testFixture.establishConnecton(testPeer);
// Allow wait for incoming message before we call receive.
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
incoming.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message to arrive then try and receive it, which should not happen
// since the connection is not started.
assertTrue(incoming.await(10, TimeUnit.SECONDS));
assertNull(consumer.receive(100));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testNoReceivedNoWaitMessagesWhenConnectionNotStarted() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch incoming = new CountDownLatch(1);
Connection connection = testFixture.establishConnecton(testPeer);
// Allow wait for incoming message before we call receive.
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
incoming.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message to arrive then try and receive it, which should not happen
// since the connection is not started.
assertTrue(incoming.await(10, TimeUnit.SECONDS));
assertNull(consumer.receiveNoWait());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=60000)
public void testSyncReceiveFailsWhenListenerSet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
LOG.warn("Async consumer got unexpected Message: {}", m);
}
});
try {
consumer.receive();
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
try {
consumer.receive(1000);
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
try {
consumer.receiveNoWait();
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testCreateProducerInOnMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(getTestName());
final Queue outboud = session.createQueue(getTestName() + "-FowardDest");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectSenderAttach();
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
testPeer.expectDisposition(true, new AcceptedMatcher());
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.debug("Async consumer got Message: {}", message);
try {
LOG.debug("Received async message: {}", message);
MessageProducer producer = session.createProducer(outboud);
producer.send(message);
producer.close();
LOG.debug("forwarded async message: {}", message);
} catch (Throwable e) {
LOG.debug("Caught exception: {}", e);
throw new RuntimeException(e.getMessage());
}
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=30000)
public void testReceiveWithTimoutDrainsOnNoMessage() throws IOException, Exception {
doDrainOnNoMessageTestImpl(false, true);
}
@Test(timeout=30000)
public void testReceiveWithTimoutDoesntDrainOnNoMessageWithLocalOnlyOption() throws IOException, Exception {
doDrainOnNoMessageTestImpl(true, true);
}
@Test(timeout=30000)
public void testReceiveNoWaitDrainsOnNoMessage() throws IOException, Exception {
doDrainOnNoMessageTestImpl(false, false);
}
@Test(timeout=30000)
public void testReceiveNoWaitDoesntDrainOnNoMessageWithLocalOnlyOption() throws IOException, Exception {
doDrainOnNoMessageTestImpl(true, false);
}
private void doDrainOnNoMessageTestImpl(boolean localCheckOnly, boolean noWait) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = null;
if (localCheckOnly) {
if (noWait) {
connection = testFixture.establishConnecton(testPeer, "?jms.receiveNoWaitLocalOnly=true");
} else {
connection = testFixture.establishConnecton(testPeer, "?jms.receiveLocalOnly=true");
}
} else {
connection = testFixture.establishConnecton(testPeer);
}
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expect receiver link attach and send credit
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
if (!localCheckOnly) {
// If not doing local-check only, expect the credit to be drained
// and then replenished to open the window again
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
}
MessageConsumer consumer = session.createConsumer(queue);
Message msg = null;
if (noWait) {
msg = consumer.receiveNoWait();
} else {
msg = consumer.receive(1);
}
assertNull(msg);
// Then close the consumer
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(false, false);
}
@Test(timeout=30000)
public void testReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(true, false);
}
@Test(timeout=30000)
public void testDurableReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(false, true);
}
@Test(timeout=30000)
public void testDurableReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(true, true);
}
private void doDrainWithNoResponseOnNoMessageTestImpl(boolean noWait, boolean durable) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = null;
connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=500");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
// Expect receiver link attach and send credit
if (durable) {
testPeer.expectDurableSubscriberAttach(topic.getTopicName(), getTestName());
} else {
testPeer.expectReceiverAttach();
}
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH)));
// Expect drain but do not respond so that the consumer times out.
testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH)));
// Consumer should close due to timed waiting for drain.
if (durable) {
testPeer.expectDetach(false, true, true);
} else {
testPeer.expectDetach(true, true, true);
}
final MessageConsumer consumer;
if (durable) {
consumer = session.createDurableSubscriber(topic, getTestName());
} else {
consumer = session.createConsumer(topic);
}
try {
if (noWait) {
consumer.receiveNoWait();
} else {
consumer.receive(1);
}
fail("Drain timeout should have aborted the receive.");
} catch (JMSException ex) {
LOG.info("Receive failed after drain timeout as expected: {}", ex.getMessage());
}
assertTrue("Consumer should close", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
consumer.getMessageSelector();
return false;
} catch (JMSException ex) {
return true;
}
}
}));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
/* Check the clients view of the remaining credit stays in sync with the transports
* even in the face of the remote peer advancing the delivery count unexpectedly,
* ensuring the client doesn't later think there is credit when there is none.
*
* QPIDJMS-139 / QPID-6947 / PROTON-1077
*
* Expect receiver link to attach and send credit, then:
* - Have test peer sender pretend it got multiple flows, and only some of the credit had previously arrived.
* - Unexpectedly advance delivery count and use up some of credit, set a 0 link credit.
* - Top up the link credit with the remainder as if it had just arrived, not advancing delivery count.
* - Send messages using up the remaining credit.
* - Check when the consumer tops the credit back up to the initial value that the
* value of link-credit on the wire is actually as expected.
*/
@Test(timeout=30000)
public void testCreditReplenishmentWhenSenderAdvancesDeliveryCountUnexpectedly() throws Exception {
int prefetch = 4;
int topUp = 2;
int messageCount = prefetch - topUp;
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + prefetch);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(prefetch, topUp, messageCount);
// Expect consumer to top up the credit window to <prefetch> when accepting the messages
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(prefetch)));
testPeer.expectDisposition(true, new AcceptedMatcher(), 0, 0);
testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
// First timeout used is large to ensure the client sees the
// delivery count advancement without trying to drain first.
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(10000);
assertNotNull("Should have received message 1", msg);
msg = consumer.receive(1);
assertNotNull("Should have received message 2", msg);
// Then close the consumer
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsConnectionCloseThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
connection.close();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
assertNotNull(asyncError.get());
assertTrue(asyncError.get() instanceof IllegalStateException);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
connection.stop();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
assertNotNull("Expected IllegalStateException", asyncError.get());
assertTrue(asyncError.get() instanceof IllegalStateException);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsSessionCloseThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
session.close();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
Exception ex = asyncError.get();
assertNotNull("Expected an exception", ex);
boolean expectedType = ex instanceof IllegalStateException;
if(!expectedType) {
LOG.error("Unexpected exception type", ex);
}
assertTrue("Got unexpected exception type: " + ex, expectedType);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
}