blob: c6a953dbc25292e4f158192c81678cf96c95ab97 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.unit.topic;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
* a static on a base test helper class, e.g. TestUtils.
*
* @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
* out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
* driven by the test model.
*/
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
protected void setUp() throws Exception
{
super.setUp();
}
protected void tearDown() throws Exception
{
super.tearDown();
}
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
_logger.info("Create Session 1");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
_logger.info("Create Consumer on Session 1");
MessageConsumer consumer1 = session1.createConsumer(topic);
_logger.info("Create Producer on Session 1");
MessageProducer producer = session1.createProducer(topic);
_logger.info("Create Session 2");
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
_logger.info("Create Durable Subscriber on Session 2");
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
_logger.info("Starting connection");
con.start();
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
_logger.info("Receive message on consumer 1:expecting A");
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(1000);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
_logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive();
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
_logger.info("Receive message on consumer 2 :expecting null");
msg = consumer2.receive(1000);
assertEquals(null, msg);
_logger.info("Close connection");
con.close();
}
public void testDurabilityAUTOACK() throws Exception
{
durabilityImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testDurabilityNOACKSessionPerConnection() throws Exception
{
durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
}
public void testDurabilityAUTOACKSessionPerConnection() throws Exception
{
durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
}
private void durabilityImpl(int ackMode) throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageProducer producer = sessionProd.createProducer(topic);
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
producer.send(session1.createTextMessage("A"));
Message msg;
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(1000);
assertEquals(null, msg);
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(1000);
assertEquals(null, msg);
consumer2.close();
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(500);
assertNotNull("Consumer 1 should get message 'B'.", msg);
assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(500);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
consumer1.close();
consumer3.close();
session3.unsubscribe("MySubscription");
con.close();
}
private void durabilityImplSessionPerConnection(int ackMode) throws Exception
{
Message msg;
// Create producer.
AMQConnection con0 = (AMQConnection) getConnection("guest", "guest");
con0.start();
Session session0 = con0.createSession(false, ackMode);
AMQTopic topic = new AMQTopic(con0, "MyTopic");
Session sessionProd = con0.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
// Create consumer 1.
AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
con1.start();
Session session1 = con1.createSession(false, ackMode);
MessageConsumer consumer1 = session0.createConsumer(topic);
// Create consumer 2.
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
con2.start();
Session session2 = con2.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
// Send message and check that both consumers get it and only it.
producer.send(session0.createTextMessage("A"));
msg = consumer1.receive(500);
assertNotNull("Message should be available", msg);
assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
msg = consumer2.receive();
assertNotNull(msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
// Detach the durable subscriber.
consumer2.close();
session2.close();
con2.close();
// Send message and receive on open consumer.
producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(1000);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
// Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
con3.start();
Session session3 = con3.createSession(false, ackMode);
TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(500);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
consumer1.close();
consumer3.close();
session3.unsubscribe("MySubscription");
con0.close();
con1.close();
con3.close();
}
/***
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid selector, checks that the
* exception is thrown correctly and that the subscription is not created.
* @throws Exception
*/
public void testDurableWithInvalidSelector() throws Exception
{
Connection conn = getConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
AMQTopic topic = new AMQTopic((AMQConnection) conn, "MyTestDurableWithInvalidSelectorTopic");
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
try
{
TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub",
"=TEST 'test", true);
assertNull("Subscriber should not have been created", deadSubscriber);
}
catch (JMSException e)
{
assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
}
TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
Message msg = liveSubscriber.receive();
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
session.unsubscribe("testDurableWithInvalidSelectorSub");
}
/***
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid destination, checks that the
* exception is thrown correctly and that the subscription is not created.
* @throws Exception
*/
public void testDurableWithInvalidDestination() throws Exception
{
Connection conn = getConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
AMQTopic topic = new AMQTopic((AMQConnection) conn, "testDurableWithInvalidDestinationTopic");
try
{
TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub");
assertNull("Subscriber should not have been created", deadSubscriber);
}
catch (InvalidDestinationException e)
{
// This was expected
}
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub");
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
Message msg = liveSubscriber.receive();
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
session.unsubscribe("testDurableWithInvalidDestinationsub");
}
/**
* Tests QPID-1202
* Creates a durable subscription with a selector, then changes that selector on resubscription
* @throws Exception
*/
public void testResubscribeWithChangedSelector() throws Exception
{
Connection conn = getConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector");
MessageProducer producer = session.createProducer(topic);
// Create durable subscriber that matches A
TopicSubscriber subA = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector",
"Match = True", false);
// Send 1 matching message and 1 non-matching message
sendMatchingAndNonMatchingMessage(session, producer);
Message rMsg = subA.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector1",
((TextMessage) rMsg).getText());
rMsg = subA.receive(1000);
assertNull(rMsg);
// Disconnect subscriber
subA.close();
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
// Check messages are recieved properly
sendMatchingAndNonMatchingMessage(session, producer);
rMsg = subB.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
rMsg = subB.receive(1000);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException
{
TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelector2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
}
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);
}
}