| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.test.unit.topic; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQQueue; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.client.AMQTopic; |
| import org.apache.qpid.management.common.JMXConnnectionFactory; |
| import org.apache.qpid.test.utils.QpidBrokerTestCase; |
| |
| import javax.jms.Connection; |
| import javax.jms.InvalidDestinationException; |
| import javax.jms.InvalidSelectorException; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import javax.jms.Topic; |
| import javax.jms.TopicSubscriber; |
| import javax.management.MBeanServerConnection; |
| import javax.management.ObjectName; |
| import javax.management.remote.JMXConnector; |
| import java.io.IOException; |
| import java.util.Set; |
| |
| /** |
| * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as |
| * a static on a base test helper class, e.g. TestUtils. |
| * |
| * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored |
| * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit, |
| * driven by the test model. |
| */ |
| public class DurableSubscriptionTest extends QpidBrokerTestCase |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); |
| |
| /** Timeout for receive() if we are expecting a message */ |
| private static final long POSITIVE_RECEIVE_TIMEOUT = 2000; |
| |
| /** Timeout for receive() if we are not expecting a message */ |
| private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; |
| |
| private JMXConnector _jmxc; |
| private MBeanServerConnection _mbsc; |
| private static final String USER = "admin"; |
| private static final String PASSWORD = "admin"; |
| private boolean _jmxConnected; |
| |
| public void setUp() throws Exception |
| { |
| setConfigurationProperty("management.enabled", "true"); |
| _jmxConnected=false; |
| super.setUp(); |
| } |
| |
| public void tearDown() throws Exception |
| { |
| if(_jmxConnected) |
| { |
| try |
| { |
| _jmxc.close(); |
| } |
| catch (IOException e) |
| { |
| e.printStackTrace(); |
| } |
| } |
| |
| super.tearDown(); |
| } |
| |
| public void testUnsubscribe() throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic"); |
| _logger.info("Create Session 1"); |
| Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); |
| _logger.info("Create Consumer on Session 1"); |
| MessageConsumer consumer1 = session1.createConsumer(topic); |
| _logger.info("Create Producer on Session 1"); |
| MessageProducer producer = session1.createProducer(topic); |
| |
| _logger.info("Create Session 2"); |
| Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); |
| _logger.info("Create Durable Subscriber on Session 2"); |
| TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); |
| |
| _logger.info("Starting connection"); |
| con.start(); |
| |
| _logger.info("Producer sending message A"); |
| producer.send(session1.createTextMessage("A")); |
| |
| //check the dur sub's underlying queue now has msg count 1 |
| AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); |
| assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true)); |
| |
| Message msg; |
| _logger.info("Receive message on consumer 1:expecting A"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("A", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| _logger.info("Receive message on consumer 2:expecting A"); |
| msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("A", ((TextMessage) msg).getText()); |
| msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| assertEquals(null, msg); |
| |
| //check the dur sub's underlying queue now has msg count 0 |
| assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue, true)); |
| |
| consumer2.close(); |
| _logger.info("Unsubscribe session2/consumer2"); |
| session2.unsubscribe("MySubscription"); |
| |
| ((AMQSession<?, ?>) session2).sync(); |
| |
| if(isJavaBroker()) |
| { |
| //Verify that the queue was deleted by querying for its JMX MBean |
| _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", |
| getManagementPort(getPort()), USER, PASSWORD); |
| |
| _jmxConnected = true; |
| _mbsc = _jmxc.getMBeanServerConnection(); |
| |
| //must replace the occurrence of ':' in queue name with '-' |
| String queueObjectNameText = "clientid" + "-" + "MySubscription"; |
| |
| ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" |
| + queueObjectNameText + ",*"); |
| |
| Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null); |
| |
| if(objectInstances.size() != 0) |
| { |
| fail("Queue MBean was found. Expected queue to have been deleted"); |
| } |
| else |
| { |
| _logger.info("Underlying dueue for the durable subscription was confirmed deleted."); |
| } |
| } |
| |
| //verify unsubscribing the durable subscriber did not affect the non-durable one |
| _logger.info("Producer sending message B"); |
| producer.send(session1.createTextMessage("B")); |
| |
| _logger.info("Receive message on consumer 1 :expecting B"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("B", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| _logger.info("Close connection"); |
| con.close(); |
| } |
| |
| public void testDurabilityNOACK() throws Exception |
| { |
| durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false); |
| } |
| |
| public void testDurabilityAUTOACK() throws Exception |
| { |
| durabilityImpl(Session.AUTO_ACKNOWLEDGE, false); |
| } |
| |
| public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception |
| { |
| if(!isBrokerStorePersistent()) |
| { |
| System.out.println("The broker store is not persistent, skipping this test."); |
| return; |
| } |
| |
| durabilityImpl(Session.AUTO_ACKNOWLEDGE, true); |
| } |
| |
| public void testDurabilityNOACKSessionPerConnection() throws Exception |
| { |
| durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE); |
| } |
| |
| public void testDurabilityAUTOACKSessionPerConnection() throws Exception |
| { |
| durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| AMQTopic topic = new AMQTopic(con, "MyTopic"); |
| Session session1 = con.createSession(false, ackMode); |
| MessageConsumer consumer1 = session1.createConsumer(topic); |
| |
| Session sessionProd = con.createSession(false, ackMode); |
| MessageProducer producer = sessionProd.createProducer(topic); |
| |
| Session session2 = con.createSession(false, ackMode); |
| TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); |
| |
| con.start(); |
| |
| //send message A and check both consumers receive |
| producer.send(session1.createTextMessage("A")); |
| |
| Message msg; |
| _logger.info("Receive message on consumer 1 :expecting A"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("A", ((TextMessage) msg).getText()); |
| msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| _logger.info("Receive message on consumer 2 :expecting A"); |
| msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("A", ((TextMessage) msg).getText()); |
| msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK) |
| producer.send(session1.createTextMessage("B")); |
| |
| _logger.info("Receive message on consumer 1 :expecting B"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Consumer 1 should get message 'B'.", msg); |
| assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(500); |
| assertNull("There should be no more messages for consumption on consumer1.", msg); |
| |
| consumer2.close(); |
| session2.close(); |
| |
| //Send message C, then connect consumer 3 to durable subscription and get |
| //message B if not using NO_ACK, then receive C with consumer 1 and 3 |
| producer.send(session1.createTextMessage("C")); |
| |
| Session session3 = con.createSession(false, ackMode); |
| MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); |
| |
| if(ackMode == AMQSession.NO_ACKNOWLEDGE) |
| { |
| //Do nothing if NO_ACK was used, as prefetch means the message was dropped |
| //when we didn't call receive() to get it before closing consumer 2 |
| } |
| else |
| { |
| _logger.info("Receive message on consumer 3 :expecting B"); |
| msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Consumer 3 should get message 'B'.", msg); |
| assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText()); |
| } |
| |
| _logger.info("Receive message on consumer 1 :expecting C"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Consumer 1 should get message 'C'.", msg); |
| assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(500); |
| assertNull("There should be no more messages for consumption on consumer1.", msg); |
| |
| _logger.info("Receive message on consumer 3 :expecting C"); |
| msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Consumer 3 should get message 'C'.", msg); |
| assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 3 :expecting null"); |
| msg = consumer3.receive(500); |
| assertNull("There should be no more messages for consumption on consumer3.", msg); |
| |
| consumer1.close(); |
| consumer3.close(); |
| |
| session3.unsubscribe("MySubscription"); |
| |
| con.close(); |
| |
| if(restartBroker) |
| { |
| try |
| { |
| restartBroker(); |
| } |
| catch (Exception e) |
| { |
| fail("Error restarting the broker"); |
| } |
| } |
| } |
| |
| private void durabilityImplSessionPerConnection(int ackMode) throws Exception |
| { |
| Message msg; |
| // Create producer. |
| AMQConnection con0 = (AMQConnection) getConnection("guest", "guest"); |
| con0.start(); |
| Session session0 = con0.createSession(false, ackMode); |
| |
| AMQTopic topic = new AMQTopic(con0, "MyTopic"); |
| |
| Session sessionProd = con0.createSession(false, ackMode); |
| MessageProducer producer = sessionProd.createProducer(topic); |
| |
| // Create consumer 1. |
| AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); |
| con1.start(); |
| Session session1 = con1.createSession(false, ackMode); |
| |
| MessageConsumer consumer1 = session1.createConsumer(topic); |
| |
| // Create consumer 2. |
| AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); |
| con2.start(); |
| Session session2 = con2.createSession(false, ackMode); |
| |
| TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); |
| |
| // Send message and check that both consumers get it and only it. |
| producer.send(session0.createTextMessage("A")); |
| |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should be available", msg); |
| assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); |
| msg = consumer1.receive(500); |
| assertNull("There should be no more messages for consumption on consumer1.", msg); |
| |
| msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Message should have been received",msg); |
| assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); |
| msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("There should be no more messages for consumption on consumer2.", msg); |
| |
| // Send message and receive on consumer 1. |
| producer.send(session0.createTextMessage("B")); |
| |
| _logger.info("Receive message on consumer 1 :expecting B"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertEquals("B", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| // Detach the durable subscriber. |
| consumer2.close(); |
| session2.close(); |
| con2.close(); |
| |
| // Send message C and receive on consumer 1 |
| producer.send(session0.createTextMessage("C")); |
| |
| _logger.info("Receive message on consumer 1 :expecting C"); |
| msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertEquals("C", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 1 :expecting null"); |
| msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertEquals(null, msg); |
| |
| // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK) |
| // and also gets message C sent after it was disconnected. |
| AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); |
| con3.start(); |
| Session session3 = con3.createSession(false, ackMode); |
| |
| TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); |
| |
| if(ackMode == AMQSession.NO_ACKNOWLEDGE) |
| { |
| //Do nothing if NO_ACK was used, as prefetch means the message was dropped |
| //when we didn't call receive() to get it before closing consumer 2 |
| } |
| else |
| { |
| _logger.info("Receive message on consumer 3 :expecting B"); |
| msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull(msg); |
| assertEquals("B", ((TextMessage) msg).getText()); |
| } |
| |
| _logger.info("Receive message on consumer 3 :expecting C"); |
| msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Consumer 3 should get message 'C'.", msg); |
| assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText()); |
| _logger.info("Receive message on consumer 3 :expecting null"); |
| msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("There should be no more messages for consumption on consumer3.", msg); |
| |
| consumer1.close(); |
| consumer3.close(); |
| |
| session3.unsubscribe("MySubscription"); |
| |
| con0.close(); |
| con1.close(); |
| con3.close(); |
| } |
| |
| /** |
| * This tests the fix for QPID-1085 |
| * Creates a durable subscriber with an invalid selector, checks that the |
| * exception is thrown correctly and that the subscription is not created. |
| * @throws Exception |
| */ |
| public void testDurableWithInvalidSelector() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "MyTestDurableWithInvalidSelectorTopic"); |
| MessageProducer producer = session.createProducer(topic); |
| producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); |
| try |
| { |
| TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub", |
| "=TEST 'test", true); |
| assertNull("Subscriber should not have been created", deadSubscriber); |
| } |
| catch (JMSException e) |
| { |
| assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException); |
| } |
| TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub"); |
| assertNotNull("Subscriber should have been created", liveSubscriber); |
| |
| producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); |
| |
| Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull ("Message should have been received", msg); |
| assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); |
| assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); |
| liveSubscriber.close(); |
| session.unsubscribe("testDurableWithInvalidSelectorSub"); |
| } |
| |
| /** |
| * This tests the fix for QPID-1085 |
| * Creates a durable subscriber with an invalid destination, checks that the |
| * exception is thrown correctly and that the subscription is not created. |
| * @throws Exception |
| */ |
| public void testDurableWithInvalidDestination() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "testDurableWithInvalidDestinationTopic"); |
| try |
| { |
| TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub"); |
| assertNull("Subscriber should not have been created", deadSubscriber); |
| } |
| catch (InvalidDestinationException e) |
| { |
| // This was expected |
| } |
| MessageProducer producer = session.createProducer(topic); |
| producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); |
| |
| TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub"); |
| assertNotNull("Subscriber should have been created", liveSubscriber); |
| |
| producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); |
| Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull ("Message should have been received", msg); |
| assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); |
| assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); |
| |
| session.unsubscribe("testDurableWithInvalidDestinationsub"); |
| } |
| |
| /** |
| * Creates a durable subscription with a selector, then changes that selector on resubscription |
| * <p> |
| * QPID-1202, QPID-2418 |
| */ |
| public void testResubscribeWithChangedSelector() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector"); |
| MessageProducer producer = session.createProducer(topic); |
| |
| // Create durable subscriber that matches A |
| TopicSubscriber subA = session.createDurableSubscriber(topic, |
| "testResubscribeWithChangedSelector", |
| "Match = True", false); |
| |
| // Send 1 matching message and 1 non-matching message |
| sendMatchingAndNonMatchingMessage(session, producer); |
| |
| Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNotNull(rMsg); |
| assertEquals("Content was wrong", |
| "testResubscribeWithChangedSelector1", |
| ((TextMessage) rMsg).getText()); |
| |
| rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull(rMsg); |
| |
| // Disconnect subscriber |
| subA.close(); |
| |
| // Reconnect with new selector that matches B |
| TopicSubscriber subB = session.createDurableSubscriber(topic, |
| "testResubscribeWithChangedSelector","Match = False", false); |
| |
| //verify no messages are now recieved. |
| rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Should not have received message as the selector was changed", rMsg); |
| |
| // Check that new messages are received properly |
| sendMatchingAndNonMatchingMessage(session, producer); |
| rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); |
| |
| assertNotNull("Message should have been received", rMsg); |
| assertEquals("Content was wrong", |
| "testResubscribeWithChangedSelector2", |
| ((TextMessage) rMsg).getText()); |
| |
| |
| rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Message should not have been received",rMsg); |
| session.unsubscribe("testResubscribeWithChangedSelector"); |
| } |
| |
| public void testDurableSubscribeWithTemporaryTopic() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session ssn = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = ssn.createTemporaryTopic(); |
| try |
| { |
| ssn.createDurableSubscriber(topic, "test"); |
| fail("expected InvalidDestinationException"); |
| } |
| catch (InvalidDestinationException ex) |
| { |
| // this is expected |
| } |
| try |
| { |
| ssn.createDurableSubscriber(topic, "test", null, false); |
| fail("expected InvalidDestinationException"); |
| } |
| catch (InvalidDestinationException ex) |
| { |
| // this is expected |
| } |
| } |
| |
| private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException |
| { |
| TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1"); |
| msg.setBooleanProperty("Match", true); |
| producer.send(msg); |
| msg = session.createTextMessage("testResubscribeWithChangedSelector2"); |
| msg.setBooleanProperty("Match", false); |
| producer.send(msg); |
| } |
| |
| |
| /** |
| * create and register a durable subscriber with a message selector and then close it |
| * create a publisher and send 5 right messages and 5 wrong messages |
| * create another durable subscriber with the same selector and name |
| * check messages are still there |
| * <p> |
| * QPID-2418 |
| */ |
| public void testDurSubSameMessageSelector() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(true, Session.SESSION_TRANSACTED); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); |
| |
| //create and register a durable subscriber with a message selector and then close it |
| TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); |
| subOne.close(); |
| |
| MessageProducer producer = session.createProducer(topic); |
| for (int i = 0; i < 5; i++) |
| { |
| Message message = session.createMessage(); |
| message.setBooleanProperty("testprop", true); |
| producer.send(message); |
| message = session.createMessage(); |
| message.setBooleanProperty("testprop", false); |
| producer.send(message); |
| } |
| session.commit(); |
| producer.close(); |
| |
| // should be 5 or 10 messages on queue now |
| // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) |
| AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); |
| assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); |
| |
| // now recreate the durable subscriber and check the received messages |
| TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); |
| |
| for (int i = 0; i < 5; i++) |
| { |
| Message message = subTwo.receive(1000); |
| if (message == null) |
| { |
| fail("sameMessageSelector test failed. no message was returned"); |
| } |
| else |
| { |
| assertEquals("sameMessageSelector test failed. message selector not reset", |
| "true", message.getStringProperty("testprop")); |
| } |
| } |
| |
| session.commit(); |
| |
| // Check queue has no messages |
| if (isJavaBroker()) |
| { |
| assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); |
| } |
| else |
| { |
| assertTrue("At most the queue should have only 1 message", ((AMQSession<?, ?>) session).getQueueDepth(queue) <= 1); |
| } |
| |
| // Unsubscribe |
| session.unsubscribe("sameMessageSelector"); |
| |
| conn.close(); |
| } |
| |
| /** |
| * <ul> |
| * <li>create and register a durable subscriber with a message selector |
| * <li>create another durable subscriber with a different selector and same name |
| * <li>check first subscriber is now closed |
| * <li>create a publisher and send messages |
| * <li>check messages are received correctly |
| * </ul> |
| * <p> |
| * QPID-2418 |
| */ |
| public void testResubscribeWithChangedSelectorNoClose() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose"); |
| |
| // Create durable subscriber that matches A |
| TopicSubscriber subA = session.createDurableSubscriber(topic, |
| "testResubscribeWithChangedSelectorNoClose", |
| "Match = True", false); |
| |
| // Reconnect with new selector that matches B |
| TopicSubscriber subB = session.createDurableSubscriber(topic, |
| "testResubscribeWithChangedSelectorNoClose", |
| "Match = false", false); |
| |
| // First subscription has been closed |
| try |
| { |
| subA.receive(1000); |
| fail("First subscription was not closed"); |
| } |
| catch (Exception e) |
| { |
| e.printStackTrace(); |
| } |
| |
| conn.stop(); |
| |
| // Send 1 matching message and 1 non-matching message |
| MessageProducer producer = session.createProducer(topic); |
| TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); |
| msg.setBooleanProperty("Match", true); |
| producer.send(msg); |
| msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); |
| msg.setBooleanProperty("Match", false); |
| producer.send(msg); |
| |
| // should be 1 or 2 messages on queue now |
| // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) |
| AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); |
| assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); |
| |
| conn.start(); |
| |
| Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull(rMsg); |
| assertEquals("Content was wrong", |
| "testResubscribeWithChangedSelectorAndRestart2", |
| ((TextMessage) rMsg).getText()); |
| |
| rMsg = subB.receive(1000); |
| assertNull(rMsg); |
| |
| // Check queue has no messages |
| assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); |
| |
| conn.close(); |
| } |
| |
| /** |
| * <ul> |
| * <li>create and register a durable subscriber with no message selector |
| * <li>create another durable subscriber with a selector and same name |
| * <li>check first subscriber is now closed |
| * <li>create a publisher and send messages |
| * <li>check messages are recieved correctly |
| * </ul> |
| * <p> |
| * QPID-2418 |
| */ |
| public void testDurSubAddMessageSelectorNoClose() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); |
| |
| // create and register a durable subscriber with no message selector |
| TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false); |
| |
| // now create a durable subscriber with a selector |
| TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false); |
| |
| // First subscription has been closed |
| try |
| { |
| subOne.receive(1000); |
| fail("First subscription was not closed"); |
| } |
| catch (Exception e) |
| { |
| e.printStackTrace(); |
| } |
| |
| conn.stop(); |
| |
| // Send 1 matching message and 1 non-matching message |
| MessageProducer producer = session.createProducer(topic); |
| TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); |
| msg.setBooleanProperty("testprop", true); |
| producer.send(msg); |
| msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); |
| msg.setBooleanProperty("testprop", false); |
| producer.send(msg); |
| |
| // should be 1 or 2 messages on queue now |
| // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) |
| AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); |
| assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); |
| |
| conn.start(); |
| |
| Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT); |
| assertNotNull(rMsg); |
| assertEquals("Content was wrong", |
| "testResubscribeWithChangedSelectorAndRestart1", |
| ((TextMessage) rMsg).getText()); |
| |
| rMsg = subTwo.receive(1000); |
| assertNull(rMsg); |
| |
| // Check queue has no messages |
| assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); |
| |
| conn.close(); |
| } |
| |
| /** |
| * <ul> |
| * <li>create and register a durable subscriber with no message selector |
| * <li>try to create another durable with the same name, should fail |
| * </ul> |
| * <p> |
| * QPID-2418 |
| */ |
| public void testDurSubNoSelectorResubscribeNoClose() throws Exception |
| { |
| Connection conn = getConnection(); |
| conn.start(); |
| Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); |
| |
| // create and register a durable subscriber with no message selector |
| session.createDurableSubscriber(topic, "subscriptionName", null, false); |
| |
| // try to recreate the durable subscriber |
| try |
| { |
| session.createDurableSubscriber(topic, "subscriptionName", null, false); |
| fail("Subscription should not have been created"); |
| } |
| catch (Exception e) |
| { |
| e.printStackTrace(); |
| } |
| } |
| |
| /** |
| * Tests that a subscriber created on a same <i>session</i> as producer with |
| * no local true does not receive messages. |
| */ |
| public void testNoLocalOnSameSession() throws Exception |
| { |
| Connection connection = getConnection(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = session.createTopic(getTestQueueName()); |
| MessageProducer producer = session.createProducer(topic); |
| TopicSubscriber subscriber = null; |
| try |
| { |
| subscriber = session.createDurableSubscriber(topic, getTestName(), null, true); |
| connection.start(); |
| |
| producer.send(createNextMessage(session, 1)); |
| |
| Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Unexpected message received", m); |
| } |
| finally |
| { |
| session.unsubscribe(getTestName()); |
| } |
| } |
| |
| |
| /** |
| * Tests that a subscriber created on a same <i>connection</i> but separate |
| * <i>sessionM</i> as producer with no local true does not receive messages. |
| */ |
| public void testNoLocalOnSameConnection() throws Exception |
| { |
| Connection connection = getConnection(); |
| |
| Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(getTestQueueName()); |
| MessageProducer producer = producerSession.createProducer(topic); |
| |
| TopicSubscriber subscriber = null; |
| try |
| { |
| subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); |
| connection.start(); |
| |
| producer.send(createNextMessage(producerSession, 1)); |
| |
| Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Unexpected message received", m); |
| } |
| finally |
| { |
| consumerSession.unsubscribe(getTestName()); |
| } |
| } |
| |
| /** |
| * Tests that if no-local is in use, that the messages are delivered when |
| * the client reconnects. |
| * |
| * Currently fails on the Java Broker due to QPID-3605. |
| */ |
| public void testNoLocalMessagesNotDeliveredAfterReconnection() throws Exception |
| { |
| Connection connection = getConnection(); |
| |
| Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(getTestQueueName()); |
| MessageProducer producer = producerSession.createProducer(topic); |
| |
| TopicSubscriber subscriber = null; |
| try |
| { |
| subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); |
| connection.start(); |
| |
| producer.send(createNextMessage(producerSession, 1)); |
| |
| Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Unexpected message received", m); |
| |
| connection.close(); |
| |
| connection = getConnection(); |
| |
| consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); |
| connection.start(); |
| m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Message should not be received on a new connection", m); |
| } |
| finally |
| { |
| consumerSession.unsubscribe(getTestName()); |
| } |
| } |
| |
| /** |
| * Tests that messages are delivered normally to a subscriber on a separate connection despite |
| * the use of durable subscriber with no-local on the first connection. |
| */ |
| public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception |
| { |
| Connection noLocalConnection = getConnection(); |
| Connection connection = getConnection(); |
| |
| String noLocalSubId1 = getTestName() + "subId1"; |
| String subId = getTestName() + "subId2"; |
| |
| Session noLocalSession = noLocalConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic noLocalTopic = noLocalSession.createTopic(getTestQueueName()); |
| |
| Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(getTestQueueName()); |
| |
| TopicSubscriber noLocalSubscriber = null; |
| TopicSubscriber subscriber = null; |
| try |
| { |
| MessageProducer producer = noLocalSession.createProducer(noLocalTopic); |
| noLocalSubscriber = noLocalSession.createDurableSubscriber(noLocalTopic, noLocalSubId1, null, true); |
| subscriber = consumerSession.createDurableSubscriber(topic, subId, null, true); |
| noLocalConnection.start(); |
| connection.start(); |
| |
| producer.send(createNextMessage(noLocalSession, 1)); |
| |
| Message m1 = noLocalSubscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNull("Subscriber on nolocal connection should not receive message", m1); |
| |
| Message m2 = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); |
| assertNotNull("Subscriber on non-nolocal connection should receive message", m2); |
| } |
| finally |
| { |
| noLocalSession.unsubscribe(noLocalSubId1); |
| consumerSession.unsubscribe(subId); |
| } |
| } |
| } |