| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.jms.integration; |
| |
| import static org.hamcrest.Matchers.arrayContaining; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| |
| import javax.jms.Connection; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TemporaryQueue; |
| import javax.jms.TextMessage; |
| import javax.jms.Topic; |
| import javax.jms.TopicSubscriber; |
| |
| import org.apache.qpid.jms.JmsConnection; |
| import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties; |
| import org.apache.qpid.jms.test.QpidJmsTestCase; |
| import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Released; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; |
| import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.junit.Test; |
| |
| public class SessionIntegrationTest extends QpidJmsTestCase { |
| private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); |
| |
| @Test(timeout = 5000) |
| public void testCloseSession() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| assertNotNull("Session should not be null", session); |
| testPeer.expectEnd(); |
| session.close(); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateProducer() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| testPeer.expectSenderAttach(); |
| |
| Queue queue = session.createQueue("myQueue"); |
| session.createProducer(queue); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateProducerLinkSupportsAcceptedAndRejectedOutcomes() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String queueName = "myQueue"; |
| |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL)); |
| //TODO: what default outcome for producers? |
| //Accepted normally, Rejected for transaction controller? |
| //sourceMatcher.withDefaultOutcome(outcomeMatcher); |
| |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(queueName)); |
| |
| testPeer.expectSenderAttach(sourceMatcher, targetMatcher, false, false); |
| |
| Queue queue = session.createQueue(queueName); |
| session.createProducer(queue); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateConsumer() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlow(); |
| |
| Queue queue = session.createQueue("myQueue"); |
| session.createConsumer(queue); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateTemporaryQueue() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String dynamicAddress = "myTempQueueAddress"; |
| testPeer.expectTempQueueCreationAttach(dynamicAddress); |
| |
| TemporaryQueue tempQueue = session.createTemporaryQueue(); |
| assertNotNull("TemporaryQueue object was null", tempQueue); |
| assertNotNull("TemporaryQueue queue name was null", tempQueue.getQueueName()); |
| assertEquals("TemporaryQueue name not as expected", dynamicAddress, tempQueue.getQueueName()); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateDurableTopicSubscriber() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| String subscriptionName = "mySubscription"; |
| |
| testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); |
| testPeer.expectLinkFlow(); |
| |
| TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName); |
| assertNotNull("TopicSubscriber object was null", subscriber); |
| assertFalse("TopicSubscriber should not be no-local", subscriber.getNoLocal()); |
| assertNull("TopicSubscriber should not have a selector", subscriber.getMessageSelector()); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCloseDurableTopicSubscriberDetachesWithCloseFalse() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| String subscriptionName = "mySubscription"; |
| |
| testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); |
| testPeer.expectLinkFlow(); |
| |
| TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName); |
| |
| testPeer.expectDetach(false, true, false); |
| subscriber.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsSupported() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| //Add capability to indicate support for ANONYMOUS-RELAY |
| Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY}; |
| |
| Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| //Expect and accept a link to the anonymous relay node |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(nullValue()); |
| targetMatcher.withDynamic(nullValue());//default = false |
| targetMatcher.withDurable(nullValue());//default = none/0 |
| |
| testPeer.expectSenderAttach(targetMatcher, false, false); |
| |
| //Create an anonymous producer |
| MessageProducer producer = session.createProducer(null); |
| assertNotNull("Producer object was null", producer); |
| |
| //Expect a new message sent with this producer to use the link to the anonymous relay matched above |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createMessage(); |
| producer.send(dest, message); |
| |
| //Repeat the send and observe another transfer on the existing link. |
| testPeer.expectTransfer(messageMatcher); |
| |
| producer.send(dest, message); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(false); |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(true); |
| } |
| |
| private void doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(boolean deferAttachFrameWrite) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| //Add capability to indicate support for ANONYMOUS-RELAY |
| Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY}; |
| |
| Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| //Expect and refuse a link to the anonymous relay node |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(nullValue()); |
| targetMatcher.withDynamic(nullValue());//default = false |
| targetMatcher.withDurable(nullValue());//default = none/0 |
| |
| testPeer.expectSenderAttach(targetMatcher, true, false); |
| //Expect the detach response to the test peer closing the producer link after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| try { |
| session.createProducer(null); |
| fail("Expected producer creation to fail if anonymous-relay link refused"); |
| } catch (JMSException jmse) { |
| //expected |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateProducerFailsWhenLinkRefusedTestImpl(false); |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateProducerFailsWhenLinkRefusedTestImpl(true); |
| } |
| |
| private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws JMSException, InterruptedException, Exception, IOException { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| //Expect a link to a topic node, which we will then refuse |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(topicName)); |
| targetMatcher.withDynamic(nullValue());//default = false |
| targetMatcher.withDurable(nullValue());//default = none/0 |
| |
| testPeer.expectSenderAttach(targetMatcher, true, deferAttachResponseWrite); |
| //Expect the detach response to the test peer closing the producer link after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| try { |
| //Create a producer, expect it to throw exception due to the link-refusal |
| session.createProducer(dest); |
| fail("Producer creation should have failed when link was refused"); |
| } catch(JMSException jmse) { |
| //Expected |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| |
| //DO NOT add capability to indicate server support for ANONYMOUS-RELAY |
| |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| // Expect no AMQP traffic when we create the anonymous producer, as it will wait |
| // for an actual send to occur on the producer before anything occurs on the wire |
| |
| //Create an anonymous producer |
| MessageProducer producer = session.createProducer(null); |
| assertNotNull("Producer object was null", producer); |
| |
| //Expect a new message sent by the above producer to cause creation of a new |
| //sender link to the given destination, then closing the link after the message is sent. |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(topicName)); |
| targetMatcher.withDynamic(nullValue());//default = false |
| targetMatcher.withDurable(nullValue());//default = none/0 |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| |
| testPeer.expectSenderAttach(targetMatcher, false, false); |
| testPeer.expectTransfer(messageMatcher); |
| testPeer.expectDetach(true, true, true); |
| |
| Message message = session.createMessage(); |
| producer.send(dest, message); |
| |
| //Repeat the send and observe another attach->transfer->detach. |
| testPeer.expectSenderAttach(targetMatcher, false, false); |
| testPeer.expectTransfer(messageMatcher); |
| testPeer.expectDetach(true, true, true); |
| |
| producer.send(dest, message); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception { |
| doCommitTransactedSessionWithConsumerTestImpl(1, 1); |
| } |
| |
| @Test(timeout=5000) |
| public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { |
| doCommitTransactedSessionWithConsumerTestImpl(5, 2); |
| } |
| |
| private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); |
| |
| for (int i = 1; i <= consumeCount; i++) { |
| // Then expect an *settled* TransactionalState disposition for each message once received by the consumer |
| TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); |
| stateMatcher.withTxnId(equalTo(txnId)); |
| stateMatcher.withOutcome(new AcceptedMatcher()); |
| |
| testPeer.expectDisposition(true, stateMatcher); |
| } |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| |
| for (int i = 1; i <= consumeCount; i++) { |
| Message receivedMessage = messageConsumer.receive(1000); |
| |
| assertNotNull(receivedMessage); |
| assertTrue(receivedMessage instanceof TextMessage); |
| } |
| |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with accepted and settled disposition to indicate the commit succeeded |
| Discharge discharge = new Discharge(); |
| discharge.setFail(false); |
| discharge.setTxnId(txnId); |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); |
| |
| session.commit(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception { |
| doRollbackTransactedSessionWithConsumerTestImpl(1, 1); |
| } |
| |
| @Test(timeout=5000) |
| public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { |
| doRollbackTransactedSessionWithConsumerTestImpl(5, 2); |
| } |
| |
| private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); |
| |
| for (int i = 1; i <= consumeCount; i++) { |
| // Then expect a *settled* TransactionalState disposition for each message once received by the consumer |
| TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); |
| stateMatcher.withTxnId(equalTo(txnId)); |
| stateMatcher.withOutcome(new AcceptedMatcher()); |
| |
| testPeer.expectDisposition(true, stateMatcher); |
| } |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| |
| for (int i = 1; i <= consumeCount; i++) { |
| Message receivedMessage = messageConsumer.receive(1000); |
| |
| assertNotNull(receivedMessage); |
| assertTrue(receivedMessage instanceof TextMessage); |
| } |
| |
| // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' |
| testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO)); |
| |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with accepted and settled disposition to indicate the rollback succeeded |
| Discharge discharge = new Discharge(); |
| discharge.setFail(true); |
| discharge.setTxnId(txnId); |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); |
| |
| // Expect the messages that were not consumed to be released |
| int unconsumed = transferCount - consumeCount; |
| for (int i = 1; i <= unconsumed; i++) { |
| testPeer.expectDisposition(true, new ReleasedMatcher()); |
| } |
| |
| // Expect the consumer to be 'started' again as rollback completes |
| testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO)); |
| |
| session.rollback(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| int messageCount = 5; |
| ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| // Create a consumer and fill the prefetch with messages, which we wont consume any of |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount); |
| |
| session.createConsumer(queue); |
| |
| // Create a producer to use in provoking creation of the AMQP transaction |
| testPeer.expectSenderAttach(); |
| MessageProducer producer = session.createProducer(queue); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); |
| |
| // Expect the message which provoked creating the transaction |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); |
| testPeer.expectTransfer(messageMatcher); //TODO: check it is marked as being in the transaction |
| |
| producer.send(session.createMessage()); |
| |
| // The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow |
| // frame as we have manipulated that all the credit was already used, i.e. it already stopped. |
| |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with accepted and settled disposition to indicate the rollback succeeded |
| Discharge discharge = new Discharge(); |
| discharge.setFail(true); |
| discharge.setTxnId(txnId); |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); |
| |
| // Expect the messages that were not consumed to be released |
| for (int i = 1; i <= messageCount; i++) { |
| testPeer.expectDisposition(true, new ReleasedMatcher()); |
| } |
| |
| // Expect the consumer to be 'started' again as rollback completes |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); |
| |
| session.rollback(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| int messageCount = 5; |
| ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| // Create a consumer, expect it to flow credit, but don't send it any messages |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); |
| |
| session.createConsumer(queue); |
| |
| // Create a producer to use in provoking creation of the AMQP transaction |
| testPeer.expectSenderAttach(); |
| MessageProducer producer = session.createProducer(queue); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); |
| |
| // Expect the message which provoked creating the transaction |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); |
| testPeer.expectTransfer(messageMatcher); //TODO: check it is marked as being in the transaction |
| |
| producer.send(session.createMessage()); |
| |
| // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow. |
| // Action the drain by filling the prefetch (which is equivalent to this having happened while |
| // the Flow was in flight to the peer), and then DONT send a flow frame back to the client |
| // as it can tell from the messages that all the credit has been used. |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), |
| messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1); |
| |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with accepted and settled disposition to indicate the rollback succeeded |
| Discharge discharge = new Discharge(); |
| discharge.setFail(true); |
| discharge.setTxnId(txnId); |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); |
| |
| // Expect the messages that were not consumed to be released |
| for (int i = 1; i <= messageCount; i++) { |
| testPeer.expectDisposition(true, new ReleasedMatcher()); |
| } |
| |
| // Expect the consumer to be 'started' again as rollback completes |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); |
| |
| session.rollback(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| String queueName = "myQueue"; |
| Queue queue = session.createQueue(queueName); |
| |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(queueName)); |
| sourceMatcher.withDynamic(equalTo(false)); |
| sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL)); |
| ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(false)); |
| sourceMatcher.withDefaultOutcome(outcomeMatcher); |
| |
| testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); |
| testPeer.expectLinkFlow(); |
| |
| session.createConsumer(queue); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout=5000) |
| public void testPrefetchPolicyInfluencesCreditFlow() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| int newPrefetch = 263; |
| ((JmsConnection) connection).getPrefetchPolicy().setAll(newPrefetch); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(newPrefetch))); |
| |
| session.createConsumer(queue); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| } |