blob: c292c718bb63462a3391508efe791ff85b4d0ab3 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.unit.ct;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
/**
* Crash Recovery tests for durable subscription
*
*/
public class DurableSubscriberTest extends QpidBrokerTestCase
{
private final String _topicName = "durableSubscriberTopic";
/**
* test strategy:
* create and register a durable subscriber then close it
* create a publisher and send a persistant message followed by a non persistant message
* crash and restart the broker
* recreate the durable subscriber and check that only the first message is received
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
if (isBrokerStorePersistent())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
//create and register a durable subscriber then close it
TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub");
durConnection.start();
durSub1.close();
durSession.close();
durConnection.stop();
//create a publisher and send a persistant message followed by a non persistant message
TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = pubSession.createPublisher(topic);
Message message = pubSession.createMessage();
message.setIntProperty("count", 1);
publisher.publish(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY,
javax.jms.Message.DEFAULT_TIME_TO_LIVE);
message.setIntProperty("count", 2);
publisher.publish(message, javax.jms.DeliveryMode.NON_PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY,
javax.jms.Message.DEFAULT_TIME_TO_LIVE);
publisher.close();
pubSession.close();
//now stop the server
try
{
restartBroker();
}
catch (Exception e)
{
_logger.error("problems restarting broker: " + e);
throw e;
}
//now recreate the durable subscriber and check the received messages
factory = getConnectionFactory();
topic = (Topic) getInitialContext().lookup(_topicName);
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub");
durConnection2.start();
Message m1 = durSub2.receive(1000);
if (m1 == null)
{
assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. no message was returned",
false);
}
assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. Wrong message was returned.",
m1.getIntProperty("count") == 1);
durSession2.unsubscribe("dursub");
durConnection2.close();
}
}
/**
* create and register a durable subscriber with a message selector and then close it
* crash the broker
* create a publisher and send 5 right messages and 5 wrong messages
* recreate the durable subscriber and check we receive the 5 expected messages
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
if (isBrokerStorePersistent())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
//create and register a durable subscriber with a message selector and then close it
TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
durConnection.start();
durSub1.close();
durSession.close();
durConnection.stop();
//now stop the server
try
{
restartBroker();
}
catch (Exception e)
{
_logger.error("problems restarting broker: " + e);
throw e;
}
topic = (Topic) getInitialContext().lookup(_topicName);
factory = getConnectionFactory();
TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = pubSession.createPublisher(topic);
for (int i = 0; i < 5; i++)
{
Message message = pubSession.createMessage();
message.setStringProperty("testprop", "true");
publisher.publish(message);
message = pubSession.createMessage();
message.setStringProperty("testprop", "false");
publisher.publish(message);
}
publisher.close();
pubSession.close();
//now recreate the durable subscriber and check the received messages
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
durConnection2.start();
for (int i = 0; i < 5; i++)
{
Message message = durSub2.receive(1000);
if (message == null)
{
assertTrue("testDurSubRestoresMessageSelector test failed. no message was returned", false);
}
else
{
assertTrue("testDurSubRestoresMessageSelector test failed. message selector not reset",
message.getStringProperty("testprop").equals("true"));
}
}
durSession2.unsubscribe("dursub");
durConnection2.close();
}
}
/**
* create and register a durable subscriber without a message selector and then unsubscribe it
* create and register a durable subscriber with a message selector and then close it
* restart the broker
* send matching and non matching messages
* recreate and register the durable subscriber with a message selector
* verify only the matching messages are received
*/
public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
{
if (! isBrokerStorePersistent())
{
_logger.warn("Test skipped due to requirement of a persistent store");
return;
}
final String SUB_NAME=getTestQueueName();
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
//create and register a durable subscriber then unsubscribe it
TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME);
durConnection.start();
durSub1.close();
durSession.unsubscribe(SUB_NAME);
durSession.close();
durConnection.close();
//create and register a durable subscriber with a message selector and then close it
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
durConnection2.start();
durSub2.close();
durSession2.close();
durConnection2.close();
//now restart the server
try
{
restartBroker();
}
catch (Exception e)
{
_logger.error("problems restarting broker: " + e);
throw e;
}
//send messages matching and not matching the selector
TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = pubSession.createPublisher(topic);
for (int i = 0; i < 5; i++)
{
Message message = pubSession.createMessage();
message.setStringProperty("testprop", "true");
publisher.publish(message);
message = pubSession.createMessage();
message.setStringProperty("testprop", "false");
publisher.publish(message);
}
publisher.close();
pubSession.close();
//now recreate the durable subscriber with selector to check there are no exceptions generated
//and then verify the messages are received correctly
TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
durConnection3.start();
for (int i = 0; i < 5; i++)
{
Message message = durSub3.receive(2000);
if (message == null)
{
fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned");
}
else
{
assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector",
message.getStringProperty("testprop").equals("true"));
}
}
durSub3.close();
durSession3.unsubscribe(SUB_NAME);
durSession3.close();
durConnection3.close();
}
/**
* create and register a durable subscriber with a message selector and then unsubscribe it
* create and register a durable subscriber without a message selector and then close it
* restart the broker
* send matching and non matching messages
* recreate and register the durable subscriber without a message selector
* verify ALL the sent messages are received
*/
public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception
{
if (! isBrokerStorePersistent())
{
_logger.warn("Test skipped due to requirement of a persistent store");
return;
}
final String SUB_NAME=getTestQueueName();
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
//create and register a durable subscriber with selector then unsubscribe it
TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
durConnection.start();
durSub1.close();
durSession.unsubscribe(SUB_NAME);
durSession.close();
durConnection.close();
//create and register a durable subscriber without the message selector and then close it
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME);
durConnection2.start();
durSub2.close();
durSession2.close();
durConnection2.close();
//now restart the server
try
{
restartBroker();
}
catch (Exception e)
{
_logger.error("problems restarting broker: " + e);
throw e;
}
//send messages matching and not matching the original used selector
TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = pubSession.createPublisher(topic);
for (int i = 1; i <= 5; i++)
{
Message message = pubSession.createMessage();
message.setStringProperty("testprop", "true");
publisher.publish(message);
message = pubSession.createMessage();
message.setStringProperty("testprop", "false");
publisher.publish(message);
}
publisher.close();
pubSession.close();
//now recreate the durable subscriber without selector to check there are no exceptions generated
//then verify ALL messages sent are received
TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME);
durConnection3.start();
for (int i = 1; i <= 10; i++)
{
Message message = durSub3.receive(2000);
if (message == null)
{
fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received");
}
}
durSub3.close();
durSession3.unsubscribe(SUB_NAME);
durSession3.close();
durConnection3.close();
}
public void testResubscribeWithChangedSelectorAndRestart() throws Exception
{
if (! isBrokerStorePersistent())
{
_logger.warn("Test skipped due to requirement of a persistent store");
return;
}
Connection conn = getConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart");
MessageProducer producer = session.createProducer(topic);
// Create durable subscriber that matches A
TopicSubscriber subA = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelectorAndRestart",
"Match = True", false);
// Send 1 matching message and 1 non-matching message
TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
Message rMsg = subA.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
((TextMessage) rMsg).getText());
// Queue has no messages left
AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true));
rMsg = subA.receive(1000);
assertNull(rMsg);
// Send another 1 matching message and 1 non-matching message
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
// Disconnect subscriber without receiving the message to
//leave it on the underlying queue
subA.close();
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelectorAndRestart",
"Match = false", false);
//verify no messages are now present on the queue as changing selector should have issued
//an unsubscribe and thus deleted the previous durable backing queue for the subscription.
//check the dur sub's underlying queue now has msg count 0
assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true));
// Check that new messages are received properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
rMsg = subB.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
((TextMessage) rMsg).getText());
rMsg = subB.receive(1000);
assertNull(rMsg);
//check the dur sub's underlying queue now has msg count 0
assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true));
conn.close();
//now restart the server
try
{
restartBroker();
}
catch (Exception e)
{
_logger.error("problems restarting broker: " + e);
throw e;
}
// Reconnect to broker
Connection connection = getConnectionFactory().createConnection("guest", "guest");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart");
producer = session.createProducer(topic);
//verify no messages now present on the queue after we restart the broker
//check the dur sub's underlying queue now has msg count 0
assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true));
// Reconnect with new selector that matches B
TopicSubscriber subC = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelectorAndRestart",
"Match = False", false);
// Check that new messages are still sent and recieved properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
//check the dur sub's underlying queue now has msg count 1
assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue, true));
rMsg = subC.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
((TextMessage) rMsg).getText());
rMsg = subC.receive(1000);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
subC.close();
session.close();
connection.close();
}
}