blob: dad711f1da61e0b1f47a6cce10d99988e596a807 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.junit.Test;
/**
* Test for Consumer state when various consumer presettle options are applied.
*/
public class PresettledConsumerIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
//----- Test the jms.presettlePolicy.presettleAll option -----------------//
@Test(timeout = 20000)
public void testPresettleAllConfigurationAppliedToTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
}
@Test(timeout = 20000)
public void testPresettledAllConfigurationAppliedToQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
}
@Test(timeout = 20000)
public void testPresettledAllConfigurationAppliedToTempTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testPresettledAllConfigurationAppliedToTempQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
}
//----- Test the jms.presettlePolicy.presettleTopicConsumers option ------//
@Test(timeout = 20000)
public void testPresettleTopicConsumersConfigurationAppliedToTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
}
@Test(timeout = 20000)
public void testPresettledTopicConsumersConfigurationAppliedToQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Queue.class);
}
@Test(timeout = 20000)
public void testPresettledTopicConsumersConfigurationAppliedToTempTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testPresettledTopicConsumersConfigurationAppliedToTempQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryQueue.class);
}
//----- Test the jms.presettlePolicy.presettleQueueConsumers option ----- //
@Test(timeout = 20000)
public void testPresettleQueueConsumersConfigurationAppliedToTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Topic.class);
}
@Test(timeout = 20000)
public void testPresettledQueueConsumersConfigurationAppliedToQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
}
@Test(timeout = 20000)
public void testPresettledQueueConsumersConfigurationAppliedToTempTopic() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testPresettledQueueConsumersConfigurationAppliedToTempQueue() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
}
//----- Test the presettled consumer still settles if needed -------------//
@Test(timeout = 20000)
public void testPresettledTopicConsumerSettlesWhenNeeded() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Topic.class);
}
@Test(timeout = 20000)
public void testPresettledQueueConsumerSettlesWhenNeeded() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Queue.class);
}
@Test(timeout = 20000)
public void testPresettledTempTopicConsumerSettlesWhenNeeded() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testPresettledTempQueueConsumerSettlesWhenNeeded() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryQueue.class);
}
//----- Test that transacted consumers always send unsettled -------------//
@Test(timeout = 20000)
public void testTransactedTopicConsumerNotSettledPresettleAll() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, Topic.class);
}
@Test(timeout = 20000)
public void testTransactedQueueConsumerNotSettledPresettleAll() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, Queue.class);
}
@Test(timeout = 20000)
public void testTransactedTempTopicConsumerNotSettledPresettleAll() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testTransactedTempQueueConsumerNotSettledPresettleAll() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, TemporaryQueue.class);
}
@Test(timeout = 20000)
public void testTransactedTopicConsumerNotSettledPresettleTopicConsumers() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, Topic.class);
}
@Test(timeout = 20000)
public void testTransactedQueueConsumerNotSettledPresettleQueueConsumers() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, Queue.class);
}
@Test(timeout = 20000)
public void testTransactedTempTopicConsumerNotSettledPresettleTopicConsumers() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, TemporaryTopic.class);
}
@Test(timeout = 20000)
public void testTransactedTempQueueConsumerNotSettledPresettleQueueConsumers() throws Exception {
String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, true, false, false, TemporaryQueue.class);
}
//----- Test Method implementation ---------------------------------------//
private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, uriOptions);
connection.start();
testPeer.expectBegin();
Session session = null;
Binary txnId = null;
if (transacted) {
// Expect the session, with an immediate link to the transaction coordinator
// using a target with the expected capabilities only.
CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
// Use client ack so the receipt of the settled disposition is controllable.
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
Destination destination = null;
if (destType == Queue.class) {
destination = session.createQueue("MyQueue");
} else if (destType == Topic.class) {
destination = session.createTopic("MyTopis");
} else if (destType == TemporaryQueue.class) {
String dynamicAddress = "myTempQueueAddress";
testPeer.expectTempQueueCreationAttach(dynamicAddress);
destination = session.createTemporaryQueue();
} else if (destType == TemporaryTopic.class) {
String dynamicAddress = "myTempTopicAddress";
testPeer.expectTempTopicCreationAttach(dynamicAddress);
destination = session.createTemporaryTopic();
} else {
fail("unexpected type");
}
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
if (senderSettled) {
testPeer.expectSettledReceiverAttach();
} else {
testPeer.expectReceiverAttach();
}
// Send a settled transfer, client should not send any dispositions
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, transferSettled);
if (!transferSettled) {
if (!transacted) {
testPeer.expectDispositionThatIsAcceptedAndSettled();
} else {
// Then expect an *settled* TransactionalState disposition for each message
// once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
testPeer.expectDisposition(true, stateMatcher);
}
}
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(3000));
if (transacted) {
testPeer.expectDischarge(txnId, true);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
}