blob: c040edd7e08320f43d04570e1dbcd88b94ef6e12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.qpid.jms.message.JmsMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;
public class AmqpAcknowledgementsIntegrationTest extends QpidJmsTestCase {
private static final int INDIVIDUAL_ACK = 101;
private static final int SKIP = -1;
private static final int INVALID = 99;
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
public void testAcknowledgeFailsAfterSessionIsClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), 1);
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(6000);
assertNotNull("Message was not recieved", receivedMessage);
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
testPeer.expectEnd();
session.close();
try {
receivedMessage.acknowledge();
fail("Should not be able to acknowledge the message after session closed");
} catch (JMSException jmsex) {}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout = 20000)
public void testDefaultAcceptMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(SKIP, new AcceptedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestAcceptMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.ACCEPTED, new AcceptedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestRejectMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.REJECTED, new RejectedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestReleaseMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestReleaseMessagesClearPropsFirst() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), true);
}
@Test(timeout = 20000)
public void testRequestModifiedFailedMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.MODIFIED_FAILED, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), false);
}
@Test(timeout = 20000)
public void testRequestModifiedFailedUndeliverableHereMessages() throws Exception {
doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE, new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(true)), false);
}
@Test(timeout = 20000)
public void testRequestAcknowledgeMessagesWithInvalidDisposition() throws Exception {
doTestAmqpAcknowledgementTestImpl(INVALID, new AcceptedMatcher(), false);
}
private void doTestAmqpAcknowledgementTestImpl(int disposition, Matcher<?> descriptorMatcher, boolean clearPropsFirst) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
int msgCount = 3;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), msgCount);
for (int i = 1; i <= msgCount; i++) {
testPeer.expectDisposition(true, descriptorMatcher);
}
MessageConsumer messageConsumer = session.createConsumer(queue);
Message lastReceivedMessage = null;
for (int i = 1; i <= msgCount; i++) {
lastReceivedMessage = messageConsumer.receive(6000);
assertNotNull("Message " + i + " was not recieved", lastReceivedMessage);
}
if (disposition != SKIP) {
if (clearPropsFirst) {
lastReceivedMessage.clearProperties();
}
lastReceivedMessage.setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, disposition);
}
if (disposition == INVALID) {
try {
lastReceivedMessage.acknowledge();
fail("Should throw exception due to invalid ack type");
} catch (JMSException jmsex) {}
lastReceivedMessage.setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, JmsMessageSupport.ACCEPTED);
lastReceivedMessage.acknowledge();
} else {
lastReceivedMessage.acknowledge();
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout = 20000)
public void testDefaultAcceptMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(SKIP, new AcceptedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestAcceptMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.ACCEPTED, new AcceptedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestRejectMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.REJECTED, new RejectedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestReleaseMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), false);
}
@Test(timeout = 20000)
public void testRequestReleaseMessagesClearPropsFirstWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), true);
}
@Test(timeout = 20000)
public void testRequestModifiedFailedMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.MODIFIED_FAILED, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), false);
}
@Test(timeout = 20000)
public void testRequestModifiedFailedUndeliverableHereMessagesWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE, new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(true)), false);
}
@Test(timeout = 20000)
public void testRequestAcknowledgeMessagesWithInvalidDispositionWithMessageListener() throws Exception {
doTestAmqpAcknowledgementAsyncTestImpl(INVALID, new AcceptedMatcher(), false);
}
private void doTestAmqpAcknowledgementAsyncTestImpl(int disposition, Matcher<?> descriptorMatcher, boolean clearPropsFirst) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final int msgCount = 3;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), msgCount);
for (int i = 1; i <= msgCount; i++) {
testPeer.expectDisposition(true, descriptorMatcher);
}
final CountDownLatch receiveCountDown = new CountDownLatch(msgCount);
final AtomicReference<Message> lastReceivedMessage = new AtomicReference<Message>();
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
lastReceivedMessage.set(message);
receiveCountDown.countDown();
}
});
assertTrue("Did not get all messages", receiveCountDown.await(10, TimeUnit.SECONDS));
assertNotNull("Message was not received", lastReceivedMessage.get());
if (disposition != SKIP) {
if (clearPropsFirst) {
lastReceivedMessage.get().clearProperties();
}
lastReceivedMessage.get().setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, disposition);
}
if (disposition == INVALID) {
try {
lastReceivedMessage.get().acknowledge();
fail("Should throw exception due to invalid ack type");
} catch (JMSException jmsex) {}
lastReceivedMessage.get().setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, JmsMessageSupport.ACCEPTED);
lastReceivedMessage.get().acknowledge();
} else {
lastReceivedMessage.get().acknowledge();
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout = 20000)
public void testAcknowledgeIndividualMessages() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(INDIVIDUAL_ACK);
Queue queue = session.createQueue("myQueue");
int msgCount = 6;
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), msgCount, false, false,
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, true);
MessageConsumer messageConsumer = session.createConsumer(queue);
List<Message> messages = new ArrayList<>();
Message lastReceivedMessage = null;
for (int i = 0; i < msgCount; i++) {
lastReceivedMessage = messageConsumer.receive(3000);
assertNotNull("Message " + i + " was not received", lastReceivedMessage);
messages.add(lastReceivedMessage);
assertEquals("unexpected message number property", i, lastReceivedMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER));
}
List<Integer> ackTypes = new ArrayList<>();
ackTypes.add(SKIP);
ackTypes.add(JmsMessageSupport.ACCEPTED);
ackTypes.add(JmsMessageSupport.REJECTED);
ackTypes.add(JmsMessageSupport.RELEASED);
ackTypes.add(JmsMessageSupport.MODIFIED_FAILED);
ackTypes.add(JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE);
Matcher<?>[] dispositionMatchers = new Matcher<?>[msgCount];
dispositionMatchers[0] = null;
dispositionMatchers[JmsMessageSupport.ACCEPTED] = new AcceptedMatcher();
dispositionMatchers[JmsMessageSupport.REJECTED] = new RejectedMatcher();
dispositionMatchers[JmsMessageSupport.RELEASED] = new ReleasedMatcher();
dispositionMatchers[JmsMessageSupport.MODIFIED_FAILED] = new ModifiedMatcher().withDeliveryFailed(equalTo(true));
dispositionMatchers[JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE] = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(true));
// Acknowledge the messages in a random order with random amqp ack type set (leaving one message without
// any specific set, to check it accepts), verify the individual dispositions have expected delivery state.
Random rand = new Random();
for (int i = 0; i < msgCount; i++) {
Message msg = messages.remove(rand.nextInt(msgCount - i));
int deliveryNumber = msg.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1;
int ackType = ackTypes.remove(rand.nextInt(msgCount - i));
if(ackType != SKIP) {
msg.setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, ackType);
testPeer.expectDisposition(true, dispositionMatchers[ackType], deliveryNumber, deliveryNumber);
} else {
testPeer.expectDisposition(true, new AcceptedMatcher(), deliveryNumber, deliveryNumber);
}
msg.acknowledge();
testPeer.waitForAllHandlersToComplete(3000);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
}