blob: e04b8f6465c54c3feea8b2a866b3a68de0253b00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(QpidJMSTestRunner.class)
public class ConsumerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
public void testCloseConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCloseConsumerTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setCloseTimeout(500);
testPeer.expectBegin();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.expectDetach(true, false, true);
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
try {
consumer.close();
fail("Should have thrown a timed out exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught excpected exception", jmsEx);
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testRemotelyCloseConsumer() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
CountDownLatch consumerClosed = new CountDownLatch(1);
CountDownLatch exceptionFired = new CountDownLatch(1);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
consumerClosed.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
message.contains(BREAD_CRUMB);
} else {
return false;
}
}
return false;
}
}, 10000, 10));
assertTrue("Consumer closed callback didn't trigger", consumerClosed.await(2000, TimeUnit.MILLISECONDS));
assertFalse("JMS Exception listener shouldn't fire with no MessageListener", exceptionFired.await(20, TimeUnit.MILLISECONDS));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
}
}
@Test(timeout = 20000)
public void testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
CountDownLatch consumerClosed = new CountDownLatch(1);
CountDownLatch exceptionFired = new CountDownLatch(1);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionFired.countDown();
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
consumerClosed.countDown();
}
});
// Use a zero prefetch to allow setMessageListener to not race link close
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB, 10);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
message.contains(BREAD_CRUMB);
} else {
return false;
}
}
return false;
}
}, 10000, 10));
assertTrue("Consumer closed callback didn't trigger", consumerClosed.await(2000, TimeUnit.MILLISECONDS));
assertTrue("JMS Exception listener should have fired with a MessageListener", exceptionFired.await(2000, TimeUnit.MILLISECONDS));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
}
}
/**
* Test that a message is received when calling receive with a timeout
* of 0, which means wait indefinitely.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceiveMessageWithReceiveZeroTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(0);
assertNotNull("A message should have been recieved", receivedMessage);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Test that an Ack is not dropped when RTE is thrown from onMessage
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testRuntimeExceptionInOnMessageReleasesInAutoAckMode() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsReleasedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
throw new RuntimeException();
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Test that an Ack is not dropped when RTE is thrown from onMessage
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testRuntimeExceptionInOnMessageReleasesInDupsOkAckMode() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsReleasedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
throw new RuntimeException();
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testCloseDurableTopicSubscriberDetachesWithCloseFalse() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
testPeer.expectDetach(false, true, false);
subscriber.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String topicName = "myTopic";
String subscriptionName = "mySubscription";
Topic topic = session.createTopic(topicName);
int messageCount = 5;
// Create a consumer and fill the prefetch with some messages,
// which we will consume some of but ack none of.
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName);
int consumeCount = 2;
Message receivedMessage = null;
for (int i = 1; i <= consumeCount; i++) {
receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Expect the messages that were not delivered to be released.
for (int i = 1; i <= consumeCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
receivedMessage.acknowledge();
testPeer.expectDetach(false, true, false);
for (int i = consumeCount + 1 ; i <= messageCount; i++) {
testPeer.expectDisposition(true, new ReleasedMatcher());
}
testPeer.expectEnd();
consumer.close();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testConsumerReceiveThrowsIfConnectionLost() throws Exception {
doConsumerReceiveThrowsIfConnectionLostTestImpl(false);
}
@Test(timeout=20000)
public void testConsumerTimedReceiveThrowsIfConnectionLost() throws Exception {
doConsumerReceiveThrowsIfConnectionLostTestImpl(true);
}
private void doConsumerReceiveThrowsIfConnectionLostTestImpl(boolean useTimeout) throws JMSException, Exception, IOException {
final CountDownLatch consumerReady = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.runAfterLastHandler(new AmqpPeerRunnable() {
@Override
public void run() {
try {
consumerReady.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting");
Thread.currentThread().interrupt();
}
}
});
testPeer.dropAfterLastHandler(10);
final MessageConsumer consumer = session.createConsumer(queue);
consumerReady.countDown();
try {
if (useTimeout) {
consumer.receive(100000);
} else {
consumer.receive();
}
fail("An exception should have been thrown");
} catch (JMSException jmse) {
// Expected
}
}
}
@Test(timeout=20000)
public void testConsumerReceiveNoWaitThrowsIfConnectionLost() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow(false, notNullValue(UnsignedInteger.class));
testPeer.expectLinkFlow(true, notNullValue(UnsignedInteger.class));
testPeer.dropAfterLastHandler();
final MessageConsumer consumer = session.createConsumer(queue);
try {
consumer.receiveNoWait();
fail("An exception should have been thrown");
} catch (JMSException jmse) {
// Expected
}
}
}
@Test(timeout=20000)
public void testSetMessageListenerAfterStartAndSend() throws Exception {
final int messageCount = 4;
final CountDownLatch latch = new CountDownLatch(messageCount);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < messageCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
LOG.debug("Async consumer got Message: {}", m);
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch incoming = new CountDownLatch(1);
Connection connection = testFixture.establishConnecton(testPeer);
// Allow wait for incoming message before we call receive.
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
incoming.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message to arrive then try and receive it, which should not happen
// since the connection is not started.
assertTrue(incoming.await(10, TimeUnit.SECONDS));
assertNull(consumer.receive(100));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testNoReceivedNoWaitMessagesWhenConnectionNotStarted() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch incoming = new CountDownLatch(1);
Connection connection = testFixture.establishConnecton(testPeer);
// Allow wait for incoming message before we call receive.
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
incoming.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message to arrive then try and receive it, which should not happen
// since the connection is not started.
assertTrue(incoming.await(10, TimeUnit.SECONDS));
assertNull(consumer.receiveNoWait());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=60000)
public void testSyncReceiveFailsWhenListenerSet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
LOG.warn("Async consumer got unexpected Message: {}", m);
}
});
try {
consumer.receive();
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
try {
consumer.receive(1000);
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
try {
consumer.receiveNoWait();
fail("Should have thrown an exception.");
} catch (JMSException ex) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testCreateProducerInOnMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(getTestName());
final Queue outboud = session.createQueue(getTestName() + "-FowardDest");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectSenderAttach();
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
testPeer.expectDisposition(true, new AcceptedMatcher());
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.debug("Async consumer got Message: {}", message);
try {
LOG.debug("Received async message: {}", message);
MessageProducer producer = session.createProducer(outboud);
producer.send(message);
producer.close();
LOG.debug("forwarded async message: {}", message);
} catch (Throwable e) {
LOG.debug("Caught exception: {}", e);
throw new RuntimeException(e.getMessage());
}
}
});
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=30000)
public void testReceiveWithTimoutDrainsOnNoMessage() throws IOException, Exception {
doDrainOnNoMessageTestImpl(false, true);
}
@Test(timeout=30000)
public void testReceiveWithTimoutDoesntDrainOnNoMessageWithLocalOnlyOption() throws IOException, Exception {
doDrainOnNoMessageTestImpl(true, true);
}
@Test(timeout=30000)
public void testReceiveNoWaitDrainsOnNoMessage() throws IOException, Exception {
doDrainOnNoMessageTestImpl(false, false);
}
@Test(timeout=30000)
public void testReceiveNoWaitDoesntDrainOnNoMessageWithLocalOnlyOption() throws IOException, Exception {
doDrainOnNoMessageTestImpl(true, false);
}
private void doDrainOnNoMessageTestImpl(boolean localCheckOnly, boolean noWait) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = null;
if (localCheckOnly) {
if (noWait) {
connection = testFixture.establishConnecton(testPeer, "?jms.receiveNoWaitLocalOnly=true");
} else {
connection = testFixture.establishConnecton(testPeer, "?jms.receiveLocalOnly=true");
}
} else {
connection = testFixture.establishConnecton(testPeer);
}
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expect receiver link attach and send credit
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
if (!localCheckOnly) {
// If not doing local-check only, expect the credit to be drained
// and then replenished to open the window again
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
}
MessageConsumer consumer = session.createConsumer(queue);
Message msg = null;
if (noWait) {
msg = consumer.receiveNoWait();
} else {
msg = consumer.receive(1);
}
assertNull(msg);
// Then close the consumer
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(false, false);
}
@Test(timeout=30000)
public void testReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(true, false);
}
@Test(timeout=30000)
public void testDurableReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(false, true);
}
@Test(timeout=30000)
public void testDurableReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception {
doDrainWithNoResponseOnNoMessageTestImpl(true, true);
}
private void doDrainWithNoResponseOnNoMessageTestImpl(boolean noWait, boolean durable) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = null;
connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=500");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
// Expect receiver link attach and send credit
if (durable) {
testPeer.expectDurableSubscriberAttach(topic.getTopicName(), getTestName());
} else {
testPeer.expectReceiverAttach();
}
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH)));
// Expect drain but do not respond so that the consumer times out.
testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_TOPIC_PREFETCH)));
// Consumer should close due to timed waiting for drain.
if (durable) {
testPeer.expectDetach(false, true, true);
} else {
testPeer.expectDetach(true, true, true);
}
final MessageConsumer consumer;
if (durable) {
consumer = session.createDurableSubscriber(topic, getTestName());
} else {
consumer = session.createConsumer(topic);
}
try {
if (noWait) {
consumer.receiveNoWait();
} else {
consumer.receive(1);
}
fail("Drain timeout should have aborted the receive.");
} catch (JMSException ex) {
LOG.info("Receive failed after drain timeout as expected: {}", ex.getMessage());
}
assertTrue("Consumer should close", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageSelector();
return false;
} catch (JMSException ex) {
return true;
}
}
}));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
/* Check the clients view of the remaining credit stays in sync with the transports
* even in the face of the remote peer advancing the delivery count unexpectedly,
* ensuring the client doesn't later think there is credit when there is none.
*
* QPIDJMS-139 / QPID-6947 / PROTON-1077
*
* Expect receiver link to attach and send credit, then:
* - Have test peer sender pretend it got multiple flows, and only some of the credit had previously arrived.
* - Unexpectedly advance delivery count and use up some of credit, set a 0 link credit.
* - Top up the link credit with the remainder as if it had just arrived, not advancing delivery count.
* - Send messages using up the remaining credit.
* - Check when the consumer tops the credit back up to the initial value that the
* value of link-credit on the wire is actually as expected.
*/
@Test(timeout=30000)
public void testCreditReplenishmentWhenSenderAdvancesDeliveryCountUnexpectedly() throws Exception {
int prefetch = 4;
int topUp = 2;
int messageCount = prefetch - topUp;
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + prefetch);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(prefetch, topUp, messageCount);
// Expect consumer to top up the credit window to <prefetch-1> when accepting the first message, accounting
// for the fact the second prefetched message is still in its local buffer.
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(prefetch-1)));
testPeer.expectDisposition(true, new AcceptedMatcher(), 0, 0);
testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
// First timeout used is large to ensure the client sees the
// delivery count advancement without trying to drain first.
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(10000);
assertNotNull("Should have received message 1", msg);
msg = consumer.receive(1);
assertNotNull("Should have received message 2", msg);
// Then close the consumer
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsConnectionCloseThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
connection.close();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
assertNotNull(asyncError.get());
assertTrue(asyncError.get() instanceof IllegalStateException);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
connection.stop();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3, TimeUnit.MINUTES);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
assertNotNull("Expected IllegalStateException", asyncError.get());
assertTrue(asyncError.get() instanceof IllegalStateException);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testMessageListenerCallsSessionCloseThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
LOG.debug("Async consumer got Message: {}", m);
session.close();
} catch (Exception ex) {
asyncError.set(ex);
}
latch.countDown();
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
Exception ex = asyncError.get();
assertNotNull("Expected an exception", ex);
boolean expectedType = ex instanceof IllegalStateException;
if(!expectedType) {
LOG.error("Unexpected exception type", ex);
}
assertTrue("Got unexpected exception type: " + ex, expectedType);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testMessageListenerClosesItsConsumer() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.countDown();
}
});
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
testPeer.expectDisposition(true, new AcceptedMatcher());
testPeer.expectDetach(true, true, true);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
consumer.close();
} catch (Throwable t) {
error.set(t);
LOG.error("Unexpected error during close", t);
}
latch.countDown();
LOG.debug("Async consumer got Message: {}", m);
}
});
assertTrue("Process not completed within given timeout", latch.await(3000, TimeUnit.MILLISECONDS));
assertNull("No error expected during close", error.get());
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
assertFalse("JMS Exception listener shouldn't have fired", exceptionListenerFired.await(20, TimeUnit.MILLISECONDS));
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Repeat(repetitions = 1)
@Test(timeout=20000)
public void testRecoverOrderingWithAsyncConsumer() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
final int recoverCount = 5;
final int messageCount = 8;
final int testPayloadLength = 255;
String payload = new String(new byte[testPayloadLength], StandardCharsets.UTF_8);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(payload),
messageCount, false, false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)), 1, true);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
boolean complete;
private int messageSeen = 0;
private int expectedIndex = 0;
@Override
public void onMessage(Message message) {
if (complete) {
LOG.debug("Ignoring message as test already completed (either pass or fail)");
return;
}
try {
int actualIndex = message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER);
LOG.debug("Got message {}", actualIndex);
assertEquals("Received Message Out Of Order", expectedIndex, actualIndex);
// don't ack the message until we receive it X times
if (messageSeen < recoverCount) {
LOG.debug("Ignoring message " + actualIndex + " and calling recover");
session.recover();
messageSeen++;
} else {
messageSeen = 0;
expectedIndex++;
// Have the peer expect the accept the disposition (1-based, hence pre-incremented).
testPeer.expectDisposition(true, new AcceptedMatcher(), expectedIndex, expectedIndex);
LOG.debug("Acknowledging message {}", actualIndex);
message.acknowledge();
//testPeer.waitForAllHandlersToComplete(2000);
if (expectedIndex == messageCount) {
complete = true;
latch.countDown();
}
}
} catch (Throwable t) {
complete = true;
asyncError.set(t);
latch.countDown();
}
}
});
boolean await = latch.await(15, TimeUnit.SECONDS);
assertTrue("Messages not received within given timeout." + latch.getCount(), await);
Throwable ex = asyncError.get();
assertNull("Unexpected exception", ex);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testConsumerCloseWaitsForAsyncDeliveryToComplete() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
latch.countDown();
LOG.debug("Async consumer got Message: {}", m);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testSessionCloseWaitsForAsyncDeliveryToComplete() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
latch.countDown();
LOG.debug("Async consumer got Message: {}", m);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
testPeer.expectEnd();
session.close();
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testConnectionCloseWaitsForAsyncDeliveryToComplete() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
connection.start();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(destination);
testPeer.expectDisposition(true, new AcceptedMatcher());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
latch.countDown();
LOG.debug("Async consumer got Message: {}", m);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
}
});
boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testCloseClientAckAsyncConsumerCanStillAckMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
int messageCount = 5;
int consumeCount = 5;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
final CountDownLatch consumedLatch = new CountDownLatch(consumeCount);
final AtomicReference<Message> receivedMessage = new AtomicReference<>();
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
receivedMessage.set(message);
consumedLatch.countDown();
}
});
assertTrue("Did not consume all messages", consumedLatch.await(10, TimeUnit.SECONDS));
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
// Close should be deferred as these messages were delivered but not acknowledged.
consumer.close();
testPeer.waitForAllHandlersToComplete(3000);
for (int i = 1; i <= consumeCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
// Now the consumer should close.
testPeer.expectDetach(true, true, true);
// Ack the last read message, which should accept all previous messages as well.
receivedMessage.get().acknowledge();
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testCloseClientAckSyncConsumerCanStillAckMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
int messageCount = 5;
int consumeCount = 3;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
final CountDownLatch expected = new CountDownLatch(messageCount);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
MessageConsumer consumer = session.createConsumer(queue);
Message receivedMessage = null;
for (int i = 1; i <= consumeCount; i++) {
receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
// Expect the prefetched messages to be released for dispatch elsewhere.
for (int i = 1; i <= messageCount - consumeCount; i++) {
testPeer.expectDisposition(true, new ReleasedMatcher());
}
// Close should be deferred as these messages were delivered but not acknowledged.
consumer.close();
testPeer.waitForAllHandlersToComplete(3000);
for (int i = 1; i <= consumeCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
// Now the consumer should close.
testPeer.expectDetach(true, true, true);
// Ack the last read message, which should accept all previous messages as well.
receivedMessage.acknowledge();
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testCloseConsumersWithDeferredAckHandledLaterWhenlastConsumedMessageIsAcked() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 10;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-1"),
1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 1, true);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-2"),
1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 2, true);
final CountDownLatch expected = new CountDownLatch(2);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
// These are our two consumers, the first gets a message and abandons it, the second will have
// acknowledge called on its message which will lead to the message for the first to be acknowledged
// and then it's link will be closed.
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
Message receivedMessage1 = null;
Message receivedMessage2 = null;
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
// Take our two messages from the queue leaving them in a delivered state.
receivedMessage1 = consumer1.receive(3000);
assertNotNull(receivedMessage1);
assertTrue(receivedMessage1 instanceof TextMessage);
receivedMessage2 = consumer2.receive(3000);
assertNotNull(receivedMessage2);
assertTrue(receivedMessage2 instanceof TextMessage);
// Expect the client to then drain off all credit from the link when "closed"
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
// Expect the client to then drain off all credit from the link when "closed"
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
// Close should be deferred as the messages were delivered but not acknowledged.
consumer1.close();
consumer2.close();
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectDisposition(true, new AcceptedMatcher());
testPeer.expectDisposition(true, new AcceptedMatcher());
// Now the links should close as we tear down the deferred consumers
testPeer.expectDetach(true, true, true);
testPeer.expectDetach(true, true, true);
// Acknowledge the last read message, which should accept all previous messages as well
// and our consumers should then close their links in turn.
receivedMessage2.acknowledge();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testConsumerWithDeferredCloseAcksAsClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
int messageCount = 5;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
final CountDownLatch expected = new CountDownLatch(messageCount);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
MessageConsumer consumer = session.createConsumer(queue);
int consumeCount = 3;
Message receivedMessage = null;
for (int i = 1; i <= consumeCount; i++) {
receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
// Expect the prefetched messages to be released for dispatch elsewhere.
for (int i = 1; i <= messageCount - consumeCount; i++) {
testPeer.expectDisposition(true, new ReleasedMatcher());
}
// Close should be deferred as these messages were delivered but not acknowledged.
consumer.close();
testPeer.waitForAllHandlersToComplete(3000);
// Should not be able to consume from the consumer once closed.
try {
consumer.receive();
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
try {
consumer.receive(100);
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
try {
consumer.receiveNoWait();
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
for (int i = 1; i <= consumeCount; i++) {
testPeer.expectDisposition(true, new AcceptedMatcher());
}
// Now the consumer should close.
testPeer.expectDetach(true, true, true);
// Ack the last read message, which should accept all previous messages as well.
receivedMessage.acknowledge();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testConsumerWithDeferredCloseAcksDeliveryFailedAsSessionClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
int messageCount = 5;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
final CountDownLatch expected = new CountDownLatch(messageCount);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
MessageConsumer consumer = session.createConsumer(queue);
int consumeCount = 3;
Message receivedMessage = null;
for (int i = 1; i <= consumeCount; i++) {
receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
}
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
// Expect the prefetched messages to be released for dispatch elsewhere.
testPeer.expectDisposition(true, new ReleasedMatcher(), 4, 4);
testPeer.expectDisposition(true, new ReleasedMatcher(), 5, 5);
// Close should be deferred as these messages were delivered but not acknowledged.
consumer.close();
testPeer.waitForAllHandlersToComplete(3000);
// Should not be able to consume from the consumer once closed.
try {
consumer.receive();
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
try {
consumer.receive(100);
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
try {
consumer.receiveNoWait();
fail("Should throw as this consumer is closed.");
} catch (IllegalStateException ise) {}
// Now the delivered messages should be acknowledged as session shuts down.
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 3, 3);
testPeer.expectDetach(true, true, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testDeferredCloseTimeoutAlertsExceptionListener() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch errorLatch = new CountDownLatch(1);
final int DEFAULT_PREFETCH = 100;
// Set to fixed known value to reduce breakage if defaults are changed.
Connection connection = testFixture.establishConnecton(testPeer,
"jms.closeTimeout=500&jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
if (exception instanceof JmsOperationTimedOutException) {
errorLatch.countDown();
}
}
});
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(queue);
Message receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
// Expect the client to then drain off all credit from the link.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
// Close should be deferred as these messages were delivered but not acknowledged.
consumer.close();
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectDisposition(true, new AcceptedMatcher());
// Now the consumer should close.
testPeer.expectDetach(true, false, true);
// Ack the read message, which should accept all previous messages as well.
receivedMessage.acknowledge();
assertTrue("Did not get timed out error", errorLatch.await(10, TimeUnit.SECONDS));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testSessionCloseDoesNotDeferConsumerClose() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
MessageConsumer consumer = session.createConsumer(queue);
Message receivedMessage = consumer.receive(3000);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
testPeer.expectEnd();
session.close();
// Consumer and Session should be closed, not acknowledge allowed
try {
receivedMessage.acknowledge();
fail("Should not be allowed to call acknowledge after session closed.");
} catch (IllegalStateException ise) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedAccepted() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(1, false);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedRejected() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(2, false);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedReleased() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(3, false);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedModifiedFailed() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(4, false);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedModifiedFailedUndeliverable() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(5, false);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedAcceptedString() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(1, true);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedRejectedString() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(2, true);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedReleasedString() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(3, true);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedModifiedFailedString() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(4, true);
}
@Test(timeout=20000)
public void testRedeliveryPolicyOutcomeAppliedModifiedFailedUndeliverableString() throws Exception {
doTestRedeliveryPolicyOutcomeApplied(5, true);
}
private void doTestRedeliveryPolicyOutcomeApplied(int outcome, boolean useStringOption) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Object outcomeValue = outcome;
if (useStringOption) {
// ACCEPTED = 1
// REJECTED = 2
// RELEASED = 3
// MODIFIED_FAILED = 4
// MODIFIED_FAILED_UNDELIVERABLE = 5
switch (outcome) {
case 1:
outcomeValue = "ACCEPTED";
break;
case 2:
outcomeValue = "REJECTED";
break;
case 3:
outcomeValue = "RELEASED";
break;
case 4:
outcomeValue = "MODIFIED_FAILED";
break;
case 5:
outcomeValue = "MODIFIED_FAILED_UNDELIVERABLE";
break;
default:
throw new IllegalArgumentException("Test passed invalid outcome value");
}
}
Connection connection = testFixture.establishConnecton(testPeer,
"?jms.redeliveryPolicy.maxRedeliveries=1&jms.redeliveryPolicy.outcome=" + outcomeValue);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
HeaderDescribedType header = new HeaderDescribedType();
header.setDeliveryCount(new UnsignedInteger(2));
testPeer.expectReceiverAttach();
// Send some messages that have exceeded the specified re-delivery count
testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("redelivered-content"), 1);
// Send a message that has not exceeded the delivery count
String expectedContent = "not-redelivered";
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(expectedContent), 2);
Matcher<?> outcomeMatcher = null;
// Expect a disposition matching the given outcome index:
//
// ACCEPTED = 1
// REJECTED = 2
// RELEASED = 3
// MODIFIED_FAILED = 4
// MODIFIED_FAILED_UNDELIVERABLE = 5
switch (outcome) {
case 1:
outcomeMatcher = new AcceptedMatcher();
break;
case 2:
outcomeMatcher = new RejectedMatcher();
break;
case 3:
outcomeMatcher = new ReleasedMatcher();
break;
case 4:
ModifiedMatcher failed = new ModifiedMatcher();
failed.withDeliveryFailed(equalTo(true));
outcomeMatcher = failed;
break;
case 5:
ModifiedMatcher undeliverable = new ModifiedMatcher();
undeliverable.withDeliveryFailed(equalTo(true));
undeliverable.withUndeliverableHere(equalTo(true));
outcomeMatcher = undeliverable;
break;
default:
throw new IllegalArgumentException("Test passed invalid outcome value");
}
// Expect a settled disposition matching the configured redelivery policy outcome
testPeer.expectDisposition(true, outcomeMatcher);
// Then expect an Accepted disposition for the good message
testPeer.expectDisposition(true, new AcceptedMatcher());
final MessageConsumer consumer = session.createConsumer(queue);
Message m = consumer.receive(6000);
assertNotNull("Should have reiceved the final message", m);
assertTrue("Should have received the final message", m instanceof TextMessage);
assertEquals("Unexpected content", expectedContent, ((TextMessage)m).getText());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testLinkCreditReplenishmentWithPrefetchFilled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int prefetch = 10;
int initialMessageCount = 10;
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + prefetch);
connection.start();
final CountDownLatch expected = new CountDownLatch(initialMessageCount);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
// Expect initial credit to be sent, respond with some messages using it all
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
initialMessageCount, false, false, equalTo(UnsignedInteger.valueOf(prefetch)), 1, false, false);
MessageConsumer consumer = session.createConsumer(queue);
// Ensure all the messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
// Now consume the first 2 messages, expect the ack processing NOT to provoke flowing more credit, as over
// 70% of the max potential prefetch (8 outstanding local messages) remains in play.
int consumed = 0;
for (consumed = 1; consumed <= 2; consumed ++) {
testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
Message message = consumer.receiveNoWait();
assertNotNull("Should have received a message " + consumed, message);
}
// Now consume 3rd message, expect the ack processing to provoke flowing more credit as it hits the
// 70% low threshhold (credit + prefetched) for replenishing the credit to max out the prefetch window
// again, accounting for the already-prefetched but still not yet consumed messages.
// Also have the peer send more messages using all the remaining credit granted.
consumed = 3;
int newOutstandingCredit = 3;
assertEquals("Peer cant send more messages than we will have credit for", newOutstandingCredit, prefetch - initialMessageCount + consumed);
final CountDownLatch expected2 = new CountDownLatch(newOutstandingCredit);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
expected2.countDown();
}
});
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
newOutstandingCredit, false, false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit)), initialMessageCount + 1, false, false);
testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
Message message = consumer.receiveNoWait();
assertNotNull("Should have received a 3rd message " + consumed, message);
// Ensure all the new messages arrived so that the matching below is deterministic
assertTrue("Expected transfers didnt occur: " + expected2.getCount(), expected2.await(5, TimeUnit.SECONDS));
// Consume the rest of the messages, 4-13. Expect only dispositions initially until the threshold, then
// another flow expanding the window to the limit again, then more dispositions, then another flow as the
// threshold is crossed again when the buffered message count decreases further.
for (consumed = 4; consumed < prefetch + newOutstandingCredit; consumed++) {
if(consumed == 3 + newOutstandingCredit) {
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit)));
} else if(consumed == 3 + newOutstandingCredit * 2) {
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit * 2)));
}
testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
message = consumer.receiveNoWait();
assertNotNull("Should have received a message " + consumed, message);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testLinkCreditReplenishmentWithPrefetchTrickleFeed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int prefetch = 10;
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + prefetch);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
// Expect initial credit to be sent, respond with a single message.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, false, false, equalTo(UnsignedInteger.valueOf(prefetch)), 1, false, false);
MessageConsumer consumer = session.createConsumer(queue);
// Now consume a message and have the peer send another in response, repeat until consumed 15 messages
int consumed = 0;
for (consumed = 1; consumed <= 15; consumed ++) {
if(consumed == 5 || consumed == 10 || consumed == 15) {
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(prefetch)));
}
testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
if(consumed != 15) {
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, amqpValueNullContent, consumed +1);
}
Message message = consumer.receive(3000);
assertNotNull("Should have received a message " + consumed, message);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
}