| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.systest.destination; |
| |
| import static junit.framework.TestCase.assertNotNull; |
| import static org.apache.qpid.systest.core.util.Utils.INDEX; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.Assume.assumeThat; |
| |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Hashtable; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.QueueBrowser; |
| import javax.jms.QueueReceiver; |
| import javax.jms.QueueSession; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import javax.jms.Topic; |
| import javax.jms.TopicSession; |
| import javax.jms.TopicSubscriber; |
| import javax.naming.Context; |
| import javax.naming.InitialContext; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.AMQAnyDestination; |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQDestination; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.client.message.QpidMessageProperties; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; |
| import org.apache.qpid.messaging.Address; |
| import org.apache.qpid.systest.core.BrokerAdmin; |
| import org.apache.qpid.systest.core.JmsTestBase; |
| import org.apache.qpid.systest.core.util.Utils; |
| import org.apache.qpid.transport.ExecutionErrorCode; |
| |
| public class AddressBasedDestinationTest extends JmsTestBase |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(AddressBasedDestinationTest.class); |
| private Connection _connection; |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| _connection = getConnection(); |
| _connection.start(); |
| } |
| |
| @After |
| public void tearDown() throws Exception |
| { |
| if (_connection != null) |
| { |
| try |
| { |
| _connection.close(); |
| } |
| catch (JMSException e) |
| { |
| // ignore |
| } |
| } |
| } |
| |
| @Test |
| public void testCreateOptions() throws Exception |
| { |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // default (create never, assert never) ------------------- |
| // create never -------------------------------------------- |
| String addr1 = "ADDR:testQueue1"; |
| AMQDestination dest = new AMQAnyDestination(addr1); |
| try |
| { |
| jmsSession.createConsumer(dest); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| _connection = getConnection(); |
| jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| dest = new AMQAnyDestination(addr1); |
| } |
| |
| assertFalse("Queue should not be created", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| // create always ------------------------------------------- |
| addr1 = "ADDR:testQueue1; { create: always }"; |
| dest = new AMQAnyDestination(addr1); |
| jmsSession.createConsumer(dest); |
| |
| assertTrue("Queue not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, true)); |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest.getAddressName(), dest.getAddressName(), null)); |
| |
| // create receiver ----------------------------------------- |
| addr1 = "ADDR:testQueue2; { create: receiver }"; |
| dest = new AMQAnyDestination(addr1); |
| try |
| { |
| jmsSession.createProducer(dest); |
| fail("producer creation should fail"); |
| } |
| catch (JMSException e) |
| { |
| String expectedMessage = "The name 'testQueue2' supplied in the address " + |
| "doesn't resolve to an exchange or a queue"; |
| assertTrue(e.getCause().getMessage().contains(expectedMessage) |
| || e.getCause().getCause().getMessage().contains(expectedMessage)); |
| } |
| |
| assertFalse("Queue should not be created", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| jmsSession.createConsumer(dest); |
| |
| assertTrue("Queue not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, true)); |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest.getAddressName(), dest.getAddressName(), null)); |
| |
| // create never -------------------------------------------- |
| addr1 = "ADDR:testQueue3; { create: never }"; |
| dest = new AMQAnyDestination(addr1); |
| String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " + |
| "doesn't resolve to an exchange or a queue"; |
| try |
| { |
| jmsSession.createConsumer(dest); |
| fail("Consumer creation should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| } |
| |
| try |
| { |
| jmsSession.createProducer(dest); |
| fail("Producer creation should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| _connection = getConnection(); |
| jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| dest = new AMQAnyDestination(addr1); |
| } |
| |
| assertFalse("Queue should not be created", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| // create sender ------------------------------------------ |
| addr1 = "ADDR:testQueue3; { create: sender }"; |
| dest = new AMQAnyDestination(addr1); |
| |
| try |
| { |
| jmsSession.createConsumer(dest); |
| fail("Consumer creation should fail"); |
| } |
| catch (JMSException e) |
| { |
| // pass |
| _connection = getConnection(); |
| jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| dest = new AMQAnyDestination(addr1); |
| } |
| |
| assertFalse("Queue should not be created", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| jmsSession.createProducer(dest); |
| assertTrue("Queue not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, true)); |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest.getAddressName(), dest.getAddressName(), null)); |
| } |
| |
| @Test |
| public void testCreateQueue() throws Exception |
| { |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String addr = "ADDR:my-queue/hello; " + |
| "{" + |
| "create: always, " + |
| "node: " + |
| "{" + |
| "durable: true ," + |
| "x-declare: " + |
| "{" + |
| "exclusive: true," + |
| "arguments: {" + |
| "'qpid.alert_size': 1000," + |
| "'qpid.alert_count': 100" + |
| "}" + |
| "}, " + |
| "x-bindings: [{exchange : 'amq.direct', key : test}, " + |
| "{exchange : 'amq.fanout'}," + |
| "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + |
| "{exchange : 'amq.topic', key : 'a.#'}" + |
| "]," + |
| |
| "}" + |
| "}"; |
| AMQDestination dest = new AMQAnyDestination(addr); |
| MessageConsumer cons = jmsSession.createConsumer(dest); |
| cons.close(); |
| |
| // Even if the consumer is closed the queue and the bindings should be intact. |
| |
| assertTrue("Queue not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, true)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest.getAddressName(), dest.getAddressName(), null)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.direct", |
| dest.getAddressName(), "test", null)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.fanout", |
| dest.getAddressName(), null, null)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.topic", |
| dest.getAddressName(), "a.#", null)); |
| |
| Map<String, Object> args = new HashMap<String, Object>(); |
| args.put("x-match", "any"); |
| args.put("dep", "sales"); |
| args.put("loc", "CA"); |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.match", |
| dest.getAddressName(), null, args)); |
| |
| MessageProducer prod = jmsSession.createProducer(dest); |
| prod.send(jmsSession.createTextMessage("test")); |
| |
| MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue")); |
| Message m = cons2.receive(getReceiveTimeout()); |
| assertNotNull("Should receive message sent to my-queue", m); |
| assertEquals("The subject set in the message is incorrect", |
| "hello", |
| m.getStringProperty(QpidMessageProperties.QPID_SUBJECT)); |
| } |
| |
| @Test |
| public void testCreateExchange() throws Exception |
| { |
| createExchangeImpl(false, false, false); |
| } |
| |
| /** |
| * Verify creating an exchange via an Address, with supported |
| * exchange-declare arguments. |
| */ |
| @Test |
| public void testCreateExchangeWithArgs() throws Exception |
| { |
| assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true))); |
| createExchangeImpl(true, false, false); |
| } |
| |
| /** |
| * Verify that when creating an exchange via an Address, if a |
| * nonsense argument is specified the broker throws an execution |
| * exception back on the session with NOT_IMPLEMENTED status. |
| */ |
| @Test |
| public void testCreateExchangeWithNonsenseArgs() throws Exception |
| { |
| assumeThat("Broker-J is required", |
| getBrokerAdmin().getBrokerType(), |
| is(equalTo(BrokerAdmin.BrokerType.BROKERJ))); |
| |
| createExchangeImpl(true, true, false); |
| } |
| |
| |
| /** |
| * QPID-5816 ensure that a Destination used on a second connection is resolved again |
| * (creating the queue/exchange if necessary). |
| */ |
| @Test |
| public void testResolvedDestinationReresolvedBySecondConnection() throws Exception |
| { |
| Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| String addr = String.format("ADDR:%s; {create: always, node: {durable: false}}", getTestName()); |
| |
| Destination dest = session.createQueue(addr); |
| |
| MessageConsumer consumer = session.createConsumer(dest); |
| Utils.sendMessages(session, dest, 1); |
| Message m = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Should receive message sent to queue", m); |
| |
| _connection.close(); |
| |
| getBrokerAdmin().restart(); |
| |
| Connection connection = null; |
| try |
| { |
| connection = getConnection(); |
| connection.start(); |
| session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| // Queue should be recreated by re-resolution of the address |
| consumer = session.createConsumer(dest); |
| Utils.sendMessages(session, dest, 1); |
| m = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Should receive message sent to queue", m); |
| } |
| finally |
| { |
| if (connection != null) |
| { |
| connection.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void ensureQueueDestinationAlwaysResolved() throws Exception |
| { |
| String address = String.format("ADDR:%s; {create: always, node: {type: queue}}", getTestName()); |
| |
| Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| MessageProducer producer = session.createProducer(null); |
| producer.send(session.createQueue(address), session.createTextMessage("A")); |
| producer.send(session.createQueue(address), session.createTextMessage("B")); |
| session.commit(); |
| |
| MessageConsumer consumer = session.createConsumer(session.createQueue(address)); |
| Message messageA = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Message A is not received", messageA); |
| assertTrue("Unexpected type of message A", messageA instanceof TextMessage); |
| assertEquals("Unexpected content of message A", "A", ((TextMessage) messageA).getText()); |
| |
| Message messageB = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Message B is not received", messageB); |
| assertTrue("Unexpected type of message B", messageB instanceof TextMessage); |
| assertEquals("Unexpected content of message B", "B", ((TextMessage) messageB).getText()); |
| } |
| |
| |
| @Test |
| public void ensureTopicDestinationAlwaysResolved() throws Exception |
| { |
| String address = String.format("ADDR:amq.topic/%s; {node: {type: topic}}", getTestName()); |
| Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| MessageConsumer consumer = session.createConsumer(session.createTopic(address)); |
| |
| MessageProducer producer = session.createProducer(null); |
| producer.send(session.createTopic(address), session.createTextMessage("A")); |
| producer.send(session.createTopic(address), session.createTextMessage("B")); |
| session.commit(); |
| |
| Message messageA = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Message A is not received", messageA); |
| assertTrue("Unexpected type of message A", messageA instanceof TextMessage); |
| assertEquals("Unexpected content of message A", "A", ((TextMessage) messageA).getText()); |
| |
| Message messageB = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Message B is not received", messageB); |
| assertTrue("Unexpected type of message B", messageB instanceof TextMessage); |
| assertEquals("Unexpected content of message B", "B", ((TextMessage) messageB).getText()); |
| } |
| |
| private void createExchangeImpl(final boolean withExchangeArgs, |
| final boolean useNonsenseArguments, |
| final boolean useNonsenseExchangeType) throws Exception |
| { |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String addr = "ADDR:my-exchange/hello; " + |
| "{ " + |
| "create: always, " + |
| "node: " + |
| "{" + |
| "type: topic, " + |
| "x-declare: " + |
| "{ " + |
| "type:" + |
| (useNonsenseExchangeType ? "nonsense" : "direct") + |
| ", " + |
| "auto-delete: true" + |
| createExchangeArgsString(withExchangeArgs, useNonsenseArguments) + |
| "}" + |
| "}" + |
| "}"; |
| |
| AMQDestination dest = new AMQAnyDestination(addr); |
| |
| MessageConsumer cons; |
| try |
| { |
| cons = jmsSession.createConsumer(dest); |
| if (useNonsenseArguments || useNonsenseExchangeType) |
| { |
| fail("Expected execution exception during exchange declare did not occur"); |
| } |
| } |
| catch (JMSException e) |
| { |
| if (useNonsenseArguments && e.getCause() |
| .getMessage() |
| .contains(ExecutionErrorCode.NOT_IMPLEMENTED.toString())) |
| { |
| //expected because we used an argument which the broker doesn't have functionality |
| //for. We can't do the rest of the test as a result of the exception, just stop. |
| return; |
| } |
| else if (useNonsenseExchangeType && (e.getErrorCode().equals(String.valueOf(404)))) |
| { |
| return; |
| } |
| else if ((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010() |
| && String.valueOf(503).equals(e.getErrorCode())) |
| { |
| return; |
| } |
| else |
| { |
| fail("Unexpected exception whilst creating consumer: " + e); |
| } |
| } |
| |
| assertTrue("Exchange not created as expected", ( |
| (AMQSession) jmsSession).isExchangeExist(dest, true)); |
| |
| // The existence of the queue is implicitly tested here |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("my-exchange", |
| dest.getQueueName(), "hello", null)); |
| |
| // The client should be able to query and verify the existence of my-exchange (QPID-2774) |
| dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); |
| cons = jmsSession.createConsumer(dest); |
| } |
| |
| private String createExchangeArgsString(final boolean withExchangeArgs, |
| final boolean useNonsenseArguments) |
| { |
| String argsString; |
| |
| if (withExchangeArgs && useNonsenseArguments) |
| { |
| argsString = ", arguments: {" + |
| "'abcd.1234.wxyz': 1, " + |
| "}"; |
| } |
| else if (withExchangeArgs) |
| { |
| argsString = ", arguments: {" + |
| "'qpid.msg_sequence': 1, " + |
| "'qpid.ive': 1" + |
| "}"; |
| } |
| else |
| { |
| argsString = ""; |
| } |
| |
| return argsString; |
| } |
| |
| public void checkQueueForBindings(Session jmsSession, AMQDestination dest, String headersBinding) throws Exception |
| { |
| assertTrue("Queue not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, true)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest.getAddressName(), dest.getAddressName(), null)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.direct", |
| dest.getAddressName(), "test", null)); |
| |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.topic", |
| dest.getAddressName(), "a.#", null)); |
| |
| Address a = Address.parse(headersBinding); |
| assertTrue("Queue not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("amq.match", |
| dest.getAddressName(), null, a.getOptions())); |
| } |
| |
| /** |
| * Test goal: Verifies that a producer and consumer creation triggers the correct |
| * behavior for x-bindings specified in node props. |
| */ |
| @Test |
| public void testBindQueueWithArgs() throws Exception |
| { |
| |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; |
| |
| String addr = "node: " + |
| "{" + |
| "durable: true ," + |
| "x-declare: " + |
| "{ " + |
| "auto-delete: true," + |
| "arguments: {'qpid.alert_count': 100}" + |
| "}, " + |
| "x-bindings: [{exchange : 'amq.direct', key : test}, " + |
| "{exchange : 'amq.topic', key : 'a.#'}," + |
| headersBinding + |
| "]" + |
| "}" + |
| "}"; |
| |
| AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " + addr); |
| MessageConsumer cons = jmsSession.createConsumer(dest1); |
| checkQueueForBindings(jmsSession, dest1, headersBinding); |
| |
| AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " + addr); |
| MessageProducer prod = jmsSession.createProducer(dest2); |
| checkQueueForBindings(jmsSession, dest2, headersBinding); |
| } |
| |
| @Test |
| public void testZeroCapacityForSynchronousReceive() throws Exception |
| { |
| assumeThat("Only supported by AMQP 0-10", getProtocol(), is(equalTo("0-10"))); |
| |
| Session session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Session session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String addressString = "ADDR:my-queue; {create: always, link:{capacity: 0}}"; |
| Queue session1queue = session1.createQueue(addressString); |
| Queue session2queue = session1.createQueue(addressString); |
| MessageConsumer consumer1 = session1.createConsumer(session1queue); |
| MessageConsumer consumer1withSelector = session1.createConsumer(session1queue, "key1 = 1"); |
| MessageConsumer consumer2withSelector = session2.createConsumer(session2queue, "key2 = 2"); |
| |
| _connection.start(); |
| |
| MessageProducer producer = session1.createProducer(session1queue); |
| |
| Message m = session1.createMessage(); |
| m.setIntProperty("key1", 1); |
| m.setIntProperty("key2", 2); |
| producer.send(m); |
| |
| m = session1.createMessage(); |
| m.setIntProperty("key1", 1); |
| producer.send(m); |
| |
| m = session1.createMessage(); |
| producer.send(m); |
| |
| m = session1.createMessage(); |
| m.setIntProperty("key2", 2); |
| producer.send(m); |
| |
| assertNotNull("First message from queue should be received", consumer1withSelector.receive(getReceiveTimeout())); |
| assertNotNull("Last message on queue should be received", consumer2withSelector.receive(getReceiveTimeout())); |
| assertNotNull("Second message from queue should be received", consumer1.receive(getReceiveTimeout())); |
| assertNull("Only message remaining shouldn't match selector", consumer1withSelector.receive(getReceiveTimeout()/4)); |
| assertNotNull("Should have been one message remaining on queue", consumer1.receive(getReceiveTimeout())); |
| assertNull("No messages should be remaining on queue", consumer1.receive(getReceiveTimeout()/4)); |
| } |
| |
| /** |
| * Test goal: Verifies the capacity property in address string is handled properly. |
| * Test strategy: |
| * Creates a destination with capacity 10. |
| * Creates consumer with client ack. |
| * Sends 15 messages to the queue, tries to receive 10. |
| * Tries to receive the 11th message and checks if its null. |
| * <p> |
| * Since capacity is 10 and we haven't acked any messages, |
| * we should not have received the 11th. |
| * <p> |
| * Acks the 10th message and verifies we receive the rest of the msgs. |
| */ |
| @Test |
| public void testCapacity() throws Exception |
| { |
| verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}"); |
| } |
| |
| @Test |
| public void testSourceAndTargetCapacity() throws Exception |
| { |
| verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}"); |
| } |
| |
| private void verifyCapacity(String address) throws Exception |
| { |
| if (!isCppBroker()) |
| { |
| LOGGER.info("Not C++ broker, exiting test"); |
| return; |
| } |
| |
| Session jmsSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| |
| AMQDestination dest = new AMQAnyDestination(address); |
| MessageConsumer cons = jmsSession.createConsumer(dest); |
| MessageProducer prod = jmsSession.createProducer(dest); |
| |
| for (int i = 0; i < 15; i++) |
| { |
| prod.send(jmsSession.createTextMessage("msg" + i)); |
| } |
| Message msg = null; |
| for (int i = 0; i < 10; i++) |
| { |
| msg = cons.receive(getReceiveTimeout()); |
| assertNotNull("Should have received " + i + " message", msg); |
| assertEquals("Unexpected message received", "msg" + i, ((TextMessage) msg).getText()); |
| } |
| assertNull("Shouldn't have received the 11th message as capacity is 10", cons.receive(getReceiveTimeout())); |
| msg.acknowledge(); |
| for (int i = 11; i < 16; i++) |
| { |
| assertNotNull("Should have received the " + i + "th message as we acked the last 10", |
| cons.receive(getReceiveTimeout())); |
| } |
| } |
| |
| /** |
| * Test goal: Verifies if the new address format based destinations |
| * can be specified and loaded correctly from the properties file. |
| */ |
| @Test |
| public void testLoadingFromPropertiesFile() throws Exception |
| { |
| Hashtable<String, String> map = new Hashtable<String, String>(); |
| map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + |
| "{x-declare: {auto-delete: true, arguments : {'qpid.alert_size': 1000}}}}"); |
| |
| map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); |
| |
| map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'"); |
| |
| PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); |
| Context ctx = props.getInitialContext(map); |
| |
| AMQDestination dest1 = (AMQDestination) ctx.lookup("myQueue1"); |
| AMQDestination dest2 = (AMQDestination) ctx.lookup("myQueue2"); |
| AMQDestination dest3 = (AMQDestination) ctx.lookup("myQueue3"); |
| |
| Session jmsSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageConsumer cons1 = jmsSession.createConsumer(dest1); |
| MessageConsumer cons2 = jmsSession.createConsumer(dest2); |
| MessageConsumer cons3 = jmsSession.createConsumer(dest3); |
| |
| assertTrue("Destination1 was not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest1, true)); |
| |
| assertTrue("Destination1 was not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest1.getAddressName(), dest1.getAddressName(), null)); |
| |
| assertTrue("Destination2 was not created as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest2, true)); |
| |
| assertTrue("Destination2 was not bound as expected", ( |
| (AMQSession) jmsSession).isQueueBound("", |
| dest2.getAddressName(), dest2.getAddressName(), null)); |
| |
| MessageProducer producer = jmsSession.createProducer(dest3); |
| producer.send(jmsSession.createTextMessage("Hello")); |
| TextMessage msg = (TextMessage) cons3.receive(getReceiveTimeout()); |
| assertEquals("Destination3 was not created as expected.", msg.getText(), "Hello"); |
| } |
| |
| /** |
| * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. |
| * Test strategy: Creates and address with a default subject "topic1" |
| * Creates a message with "qpid.subject"="topic2" and sends it. |
| * Verifies that the message goes to "topic2" instead of "topic1". |
| */ |
| @Test |
| public void testOverridingSubject() throws Exception |
| { |
| Session jmsSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| |
| AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); |
| |
| MessageProducer prod = jmsSession.createProducer(topic1); |
| |
| Message m = jmsSession.createTextMessage("Hello"); |
| m.setStringProperty("qpid.subject", "topic2"); |
| |
| MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); |
| MessageConsumer consForTopic2 = |
| jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); |
| |
| prod.send(m); |
| Message msg = consForTopic1.receive(getReceiveTimeout()); |
| assertNull("message shouldn't have been sent to topic1", msg); |
| |
| msg = consForTopic2.receive(getReceiveTimeout()); |
| assertNotNull("message should have been sent to topic2", msg); |
| } |
| |
| /** |
| * Test goal: Verifies that session.createQueue method |
| * works as expected both with the new and old addressing scheme. |
| */ |
| @Test |
| public void testSessionCreateQueue() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Using the BURL method |
| Destination queue = ssn.createQueue("BURL:direct://amq.direct/my-queue/my-queue"); |
| MessageProducer prod = ssn.createProducer(queue); |
| MessageConsumer cons = ssn.createConsumer(queue); |
| assertTrue("my-queue was not created as expected", ( |
| (AMQSession) ssn).isQueueBound("amq.direct", |
| "my-queue", "my-queue", null)); |
| |
| prod.send(ssn.createTextMessage("test")); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| |
| // Using the ADDR method |
| // default case |
| queue = ssn.createQueue("ADDR:my-queue2 ; { assert : sender }"); |
| try |
| { |
| prod = ssn.createProducer(queue); |
| fail("The client should throw an exception, since there is no queue present in the broker"); |
| } |
| catch (Exception e) |
| { |
| String s = "Assert failed for queue : my-queue2"; |
| assertTrue(e.getCause().getMessage().contains(s) || e.getCause().getCause().getMessage().contains(s)); |
| } |
| |
| // explicit create case |
| queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); |
| prod = ssn.createProducer(queue); |
| cons = ssn.createConsumer(queue); |
| assertTrue("my-queue2 was not created as expected", ( |
| (AMQSession) ssn).isQueueBound("", |
| "my-queue2", "my-queue2", null)); |
| |
| prod.send(ssn.createTextMessage("test")); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| |
| // Using the ADDR method to create a more complicated queue |
| String addr = "ADDR:amq.direct/x512; {" + |
| "link : {name : 'MY.RESP.QUEUE', " + |
| "x-declare : { auto-delete: true, exclusive: true, " + |
| "arguments : {'qpid.alert_size': 1000, 'qpid.policy_type': ring} } } }"; |
| queue = ssn.createQueue(addr); |
| |
| cons = ssn.createConsumer(queue); |
| prod = ssn.createProducer(queue); |
| assertTrue("MY.RESP.QUEUE was not created as expected", ( |
| (AMQSession) ssn).isQueueBound("amq.direct", |
| "MY.RESP.QUEUE", "x512", null)); |
| cons.close(); |
| } |
| |
| /** |
| * Test goal: Verifies that session.creatTopic method works as expected |
| * both with the new and old addressing scheme. |
| */ |
| @Test |
| public void testSessionCreateTopic() throws Exception |
| { |
| sessionCreateTopicImpl(false); |
| } |
| |
| /** |
| * Test goal: Verifies that session.creatTopic method works as expected |
| * both with the new and old addressing scheme when adding exchange arguments. |
| */ |
| @Test |
| public void testSessionCreateTopicWithExchangeArgs() throws Exception |
| { |
| assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true))); |
| sessionCreateTopicImpl(true); |
| } |
| |
| private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Using the BURL method |
| Topic topic = ssn.createTopic("ACME"); |
| MessageProducer prod = ssn.createProducer(topic); |
| MessageConsumer cons = ssn.createConsumer(topic); |
| |
| prod.send(ssn.createTextMessage("test")); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| |
| // Using the ADDR method |
| topic = ssn.createTopic("ADDR:ACME"); |
| prod = ssn.createProducer(topic); |
| cons = ssn.createConsumer(topic); |
| |
| prod.send(ssn.createTextMessage("test")); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| |
| String addr = "ADDR:vehicles/bus; " + |
| "{ " + |
| "create: always, " + |
| "node: " + |
| "{" + |
| "type: topic, " + |
| "x-declare: " + |
| "{ " + |
| "type:direct, " + |
| "auto-delete: true" + |
| createExchangeArgsString(withExchangeArgs, false) + |
| "}" + |
| "}, " + |
| "link: {name : my-topic, " + |
| "x-bindings: [{exchange : 'vehicles', key : car}, " + |
| "{exchange : 'vehicles', key : van}]" + |
| "}" + |
| "}"; |
| |
| // Using the ADDR method to create a more complicated topic |
| topic = ssn.createTopic(addr); |
| cons = ssn.createConsumer(topic); |
| prod = ssn.createProducer(topic); |
| |
| assertTrue("The queue was not bound to vehicle exchange using bus as the binding key", ( |
| (AMQSession) ssn).isQueueBound("vehicles", |
| "my-topic", "bus", null)); |
| |
| assertTrue("The queue was not bound to vehicle exchange using car as the binding key", ( |
| (AMQSession) ssn).isQueueBound("vehicles", |
| "my-topic", "car", null)); |
| |
| assertTrue("The queue was not bound to vehicle exchange using van as the binding key", ( |
| (AMQSession) ssn).isQueueBound("vehicles", |
| "my-topic", "van", null)); |
| |
| Message msg = ssn.createTextMessage("test"); |
| msg.setStringProperty("qpid.subject", "van"); |
| prod.send(msg); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| } |
| |
| /** |
| * Test Goal : Verify the default subjects used for each exchange type. |
| * The default for amq.topic is "#" and for the rest it's "" |
| */ |
| @Test |
| public void testDefaultSubjects() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); |
| MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); |
| |
| MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); |
| MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); |
| MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); |
| |
| queueProducer.send(ssn.createBytesMessage()); |
| assertNotNull("The consumer subscribed to amq.direct " + |
| "with empty binding key should have received the message ", queueCons.receive(getReceiveTimeout())); |
| |
| topicProducer1.send(ssn.createTextMessage("25c")); |
| assertEquals("The consumer subscribed to amq.topic " + |
| "with '#' binding key should have received the message ", |
| ((TextMessage) topicCons.receive(getReceiveTimeout())).getText(), "25c"); |
| |
| topicProducer2.send(ssn.createTextMessage("1000")); |
| assertEquals("The consumer subscribed to amq.topic " + |
| "with '#' binding key should have received the message ", |
| ((TextMessage) topicCons.receive(getReceiveTimeout())).getText(), "1000"); |
| } |
| |
| /** |
| * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer. |
| * This indirectly tests ring queues as well. |
| */ |
| @Test |
| public void testBrowseMode() throws Exception |
| { |
| |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " + |
| "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + |
| "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}"; |
| |
| Destination dest = ssn.createQueue(addr); |
| MessageConsumer browseCons = ssn.createConsumer(dest); |
| MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); |
| |
| prod.send(ssn.createTextMessage("Test1")); |
| prod.send(ssn.createTextMessage("Test2")); |
| |
| TextMessage msg = (TextMessage) browseCons.receive(getReceiveTimeout()); |
| assertNotNull("Didn't receive the first message", msg); |
| assertEquals("Unexpected first message", msg.getText(), "Test1"); |
| |
| msg = (TextMessage) browseCons.receive(getReceiveTimeout()); |
| assertNotNull("Didn't receive the second message", msg); |
| assertEquals("Unexpected second message", msg.getText(), "Test2"); |
| |
| browseCons.close(); |
| prod.send(ssn.createTextMessage("Test3")); |
| browseCons = ssn.createConsumer(dest); |
| |
| msg = (TextMessage) browseCons.receive(getReceiveTimeout()); |
| assertEquals("Should receive the second message again", msg.getText(), "Test2"); |
| |
| msg = (TextMessage) browseCons.receive(getReceiveTimeout()); |
| assertEquals("Should receive the third message since it's a ring queue", msg.getText(), "Test3"); |
| |
| assertNull("Should not receive anymore messages", browseCons.receive(getReceiveTimeout()/4)); |
| } |
| |
| /** |
| * Test Goal : When the same destination is used when creating two consumers, |
| * If the type == topic, verify that unique subscription queues are created, |
| * unless subscription queue has a name. |
| * <p> |
| * If the type == queue, same queue should be shared. |
| */ |
| @Test |
| public void testSubscriptionForSameDestination() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); |
| MessageConsumer consumer1 = ssn.createConsumer(dest); |
| MessageConsumer consumer2 = ssn.createConsumer(dest); |
| MessageProducer prod = ssn.createProducer(dest); |
| |
| prod.send(ssn.createTextMessage("A")); |
| TextMessage m = (TextMessage) consumer1.receive(getReceiveTimeout()); |
| assertEquals("Consumer1 should receive message A", m.getText(), "A"); |
| m = (TextMessage) consumer2.receive(getReceiveTimeout()); |
| assertEquals("Consumer2 should receive message A", m.getText(), "A"); |
| |
| consumer1.close(); |
| consumer2.close(); |
| |
| dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}"); |
| consumer1 = ssn.createConsumer(dest); |
| try |
| { |
| consumer2 = ssn.createConsumer(dest); |
| fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); |
| } |
| catch (Exception e) |
| { |
| } |
| } |
| |
| @Test |
| public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception |
| { |
| assumeThat("Only supported by AMQP 0-10", getProtocol(), is(equalTo("0-10"))); |
| |
| _connection = getConnection(); |
| _connection.start(); |
| final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}"); |
| final MessageConsumer consumer1 = ssn.createConsumer(dest); |
| final MessageConsumer consumer2 = ssn.createConsumer(dest); |
| final MessageProducer prod = ssn.createProducer(dest); |
| |
| prod.send(ssn.createTextMessage("A")); |
| Message m1 = consumer1.receive(getReceiveTimeout()); |
| Message m2 = consumer2.receive(getReceiveTimeout()); |
| |
| if (m1 != null) |
| { |
| assertNull("Only one consumer should receive the message", m2); |
| } |
| else |
| { |
| assertNotNull("Only one consumer should receive the message", m2); |
| } |
| } |
| |
| @Test |
| public void testXBindingsWithoutExchangeName() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String addr = "ADDR:MRKT; " + |
| "{" + |
| "create: receiver," + |
| "node : {type: topic, x-declare: {type: topic} }," + |
| "link:{" + |
| "name: my-topic," + |
| "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + |
| "}" + |
| "}"; |
| |
| // Using the ADDR method to create a more complicated topic |
| Topic topic = ssn.createTopic(addr); |
| MessageConsumer cons = ssn.createConsumer(topic); |
| |
| assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key", ( |
| (AMQSession) ssn).isQueueBound("MRKT", |
| "my-topic", "NYSE.#", null)); |
| |
| assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key", ( |
| (AMQSession) ssn).isQueueBound("MRKT", |
| "my-topic", "NASDAQ.#", null)); |
| |
| assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key", ( |
| (AMQSession) ssn).isQueueBound("MRKT", |
| "my-topic", "CNTL.#", null)); |
| |
| MessageProducer prod = ssn.createProducer(topic); |
| Message msg = ssn.createTextMessage("test"); |
| msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); |
| prod.send(msg); |
| assertNotNull("consumer should receive a message", cons.receive(getReceiveTimeout())); |
| cons.close(); |
| } |
| |
| @Test |
| public void testXSubscribeOverrides() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String str = |
| "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; |
| Destination dest = ssn.createTopic(str); |
| MessageConsumer consumer1 = ssn.createConsumer(dest); |
| try |
| { |
| MessageConsumer consumer2 = ssn.createConsumer(dest); |
| fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); |
| } |
| catch (Exception e) |
| { |
| } |
| } |
| |
| @Test |
| public void testQueueReceiversAndTopicSubscriber() throws Exception |
| { |
| Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); |
| Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); |
| |
| QueueSession qSession = ((AMQConnection) _connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); |
| QueueReceiver receiver = qSession.createReceiver(queue); |
| |
| TopicSession tSession = ((AMQConnection) _connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); |
| TopicSubscriber sub = tSession.createSubscriber(topic); |
| |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); |
| prod1.send(ssn.createTextMessage("test1")); |
| |
| MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); |
| prod2.send(ssn.createTextMessage("test2")); |
| |
| Message msg1 = receiver.receive(getReceiveTimeout()); |
| assertNotNull(msg1); |
| assertEquals("test1", ((TextMessage) msg1).getText()); |
| |
| Message msg2 = sub.receive(getReceiveTimeout()); |
| assertNotNull(msg2); |
| assertEquals("test2", ((TextMessage) msg2).getText()); |
| } |
| |
| @Test |
| public void testDurableSubscriber() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; |
| |
| Properties props = new Properties(); |
| props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); |
| props.setProperty("destination.address1", "ADDR:amq.topic/test"); |
| props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); |
| props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); |
| String addrStr = |
| "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; |
| props.setProperty("destination.address5", addrStr); |
| |
| Context ctx = new InitialContext(props); |
| try |
| { |
| for (int i = 1; i < 4; i++) |
| { |
| Topic topic = (Topic) ctx.lookup("address" + i); |
| createDurableSubscriber(ctx, ssn, "address" + i, topic, "ADDR:amq.topic/test"); |
| } |
| |
| Topic topic = ssn.createTopic("ADDR:news.us"); |
| createDurableSubscriber(ctx, ssn, "my-dest", topic, "ADDR:news.us"); |
| |
| Topic namedQueue = (Topic) ctx.lookup("address5"); |
| try |
| { |
| createDurableSubscriber(ctx, ssn, "my-queue", namedQueue, "ADDR:amq.topic/test"); |
| fail("Exception should be thrown. Durable subscribers cannot be created for Queues"); |
| } |
| catch (JMSException e) |
| { |
| assertEquals("Durable subscribers can only be created for Topics", |
| e.getMessage()); |
| } |
| } |
| finally |
| { |
| ctx.close(); |
| } |
| } |
| |
| @Test |
| public void testDurableSubscription() throws Exception |
| { |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = session.createTopic("ADDR:amq.topic/" + getTestName()); |
| MessageProducer publisher = session.createProducer(topic); |
| MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestName()); |
| |
| TextMessage messageToSend = session.createTextMessage("Test0"); |
| publisher.send(messageToSend); |
| ((AMQSession<?, ?>) session).sync(); |
| |
| Message receivedMessage = subscriber.receive(getReceiveTimeout()); |
| assertNotNull("Message has not been received", receivedMessage); |
| assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage) receivedMessage).getText()); |
| |
| subscriber.close(); |
| |
| messageToSend = session.createTextMessage("Test1"); |
| publisher.send(messageToSend); |
| ((AMQSession<?, ?>) session).sync(); |
| |
| subscriber = session.createDurableSubscriber(topic, getTestName()); |
| receivedMessage = subscriber.receive(getReceiveTimeout()); |
| assertNotNull("Message has not been received", receivedMessage); |
| assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage) receivedMessage).getText()); |
| } |
| |
| @Test |
| public void testDurableSubscriptionnWithSelector() throws Exception |
| { |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = session.createTopic("ADDR:amq.topic/" + getTestName()); |
| MessageProducer publisher = session.createProducer(topic); |
| MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestName(), "id=1", false); |
| |
| TextMessage messageToSend = session.createTextMessage("Test0"); |
| messageToSend.setIntProperty("id", 1); |
| publisher.send(messageToSend); |
| ((AMQSession<?, ?>) session).sync(); |
| |
| Message receivedMessage = subscriber.receive(getReceiveTimeout()); |
| assertNotNull("Message has not been received", receivedMessage); |
| assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage) receivedMessage).getText()); |
| assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); |
| |
| subscriber.close(); |
| |
| messageToSend = session.createTextMessage("Test1"); |
| messageToSend.setIntProperty("id", 1); |
| publisher.send(messageToSend); |
| ((AMQSession<?, ?>) session).sync(); |
| |
| subscriber = session.createDurableSubscriber(topic, getTestName(), "id=1", false); |
| receivedMessage = subscriber.receive(getReceiveTimeout()); |
| assertNotNull("Message has not been received", receivedMessage); |
| assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage) receivedMessage).getText()); |
| assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); |
| } |
| |
| private void createDurableSubscriber(Context ctx, Session ssn, String destName, Topic topic, String producerAddr) |
| throws Exception |
| { |
| MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); |
| MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); |
| |
| Message m = ssn.createTextMessage(destName); |
| prod.send(m); |
| Message msg = cons.receive(getReceiveTimeout()); |
| assertNotNull("Message not received as expected when using Topic : " + topic, msg); |
| assertEquals(destName, ((TextMessage) msg).getText()); |
| ssn.unsubscribe(destName); |
| } |
| |
| @Test |
| public void testDeleteOptions() throws Exception |
| { |
| assumeThat("Broker-J is required", |
| getBrokerAdmin().getBrokerType(), |
| is(equalTo(BrokerAdmin.BrokerType.BROKERJ))); |
| |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // default (create never, assert never) ------------------- |
| // create never -------------------------------------------- |
| String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; |
| AMQDestination dest = new AMQAnyDestination(addr1); |
| try |
| { |
| jmsSession.createConsumer(dest).close(); |
| } |
| catch (JMSException e) |
| { |
| fail("Exception should not be thrown. Exception thrown is : " + e); |
| } |
| |
| assertFalse("Queue not deleted as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; |
| dest = new AMQAnyDestination(addr2); |
| try |
| { |
| jmsSession.createConsumer(dest).close(); |
| } |
| catch (JMSException e) |
| { |
| fail("Exception should not be thrown. Exception thrown is : " + e); |
| } |
| |
| assertFalse("Queue not deleted as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| |
| String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; |
| dest = new AMQAnyDestination(addr3); |
| try |
| { |
| jmsSession.createConsumer(dest); |
| MessageProducer prod = jmsSession.createProducer(dest); |
| prod.close(); |
| } |
| catch (JMSException e) |
| { |
| fail("Exception should not be thrown. Exception thrown is : " + e); |
| } |
| |
| assertFalse("Queue not deleted as expected", ( |
| (AMQSession) jmsSession).isQueueExist(dest, false)); |
| } |
| |
| /** |
| * Test Goals : 1. Test if the client sets the correct accept mode for unreliable |
| * and at-least-once. |
| * 2. Test default reliability modes for Queues and Topics. |
| * 3. Test if an exception is thrown if exactly-once is used. |
| * 4. Test if an exception is thrown if at-least-once is used with topics. |
| * <p> |
| * Test Strategy: For goal #1 & #2 |
| * For unreliable and at-least-once the test tries to receives messages |
| * in client_ack mode but does not ack the messages. |
| * It will then close the session, recreate a new session |
| * and will then try to verify the queue depth. |
| * For unreliable the messages should have been taken off the queue. |
| * For at-least-once the messages should be put back onto the queue. |
| */ |
| @Test |
| public void testReliabilityOptions() throws Exception |
| { |
| String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; |
| acceptModeTest(addr1, 0); |
| |
| String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; |
| acceptModeTest(addr2, 2); |
| |
| // Default accept-mode for topics |
| acceptModeTest("ADDR:amq.topic/test", 0); |
| |
| // Default accept-mode for queues |
| acceptModeTest("ADDR:testQueue1;{create: always}", 2); |
| |
| String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; |
| try |
| { |
| AMQAnyDestination dest = new AMQAnyDestination(addr3); |
| fail("An exception should be thrown indicating it's an unsupported type"); |
| } |
| catch (Exception e) |
| { |
| assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); |
| } |
| } |
| |
| private void acceptModeTest(String address, int expectedQueueDepth) throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageConsumer cons; |
| MessageProducer prod; |
| |
| AMQDestination dest = new AMQAnyDestination(address); |
| cons = ssn.createConsumer(dest); |
| prod = ssn.createProducer(dest); |
| |
| for (int i = 0; i < expectedQueueDepth; i++) |
| { |
| prod.send(ssn.createTextMessage("Msg" + i)); |
| } |
| |
| for (int i = 0; i < expectedQueueDepth; i++) |
| { |
| Message msg = cons.receive(getReceiveTimeout()); |
| assertNotNull(msg); |
| assertEquals("Msg" + i, ((TextMessage) msg).getText()); |
| } |
| |
| ssn.close(); |
| ssn = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); |
| assertEquals(expectedQueueDepth, queueDepth); |
| cons.close(); |
| prod.close(); |
| } |
| |
| @Test |
| public void testDestinationOnSend() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); |
| MessageProducer prod = ssn.createProducer(null); |
| |
| Topic queue = ssn.createTopic("ADDR:amq.topic/test"); |
| prod.send(queue, ssn.createTextMessage("A")); |
| |
| Message msg = cons.receive(getReceiveTimeout()); |
| assertNotNull(msg); |
| assertEquals("A", ((TextMessage) msg).getText()); |
| prod.close(); |
| cons.close(); |
| } |
| |
| @Test |
| public void testReplyToWithNamelessExchange() throws Exception |
| { |
| System.setProperty("qpid.declare_exchanges", "false"); |
| try |
| { |
| replyToTest("ADDR:my-queue;{create: always}"); |
| } |
| finally |
| { |
| System.clearProperty("qpid.declare_exchanges"); |
| } |
| } |
| |
| @Test |
| public void testReplyToWithCustomExchange() throws Exception |
| { |
| replyToTest("ADDR:hello;{create:always,node:{type:topic}}"); |
| } |
| |
| private void replyToTest(String replyTo) throws Exception |
| { |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Destination replyToDest = AMQDestination.createDestination(replyTo, false); |
| MessageConsumer replyToCons = session.createConsumer(replyToDest); |
| |
| Destination dest = session.createQueue("ADDR:amq.direct/test"); |
| |
| MessageConsumer cons = session.createConsumer(dest); |
| MessageProducer prod = session.createProducer(dest); |
| Message m = session.createTextMessage("test"); |
| m.setJMSReplyTo(replyToDest); |
| prod.send(m); |
| |
| Message msg = cons.receive(getReceiveTimeout()); |
| MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); |
| prodR.send(session.createTextMessage("x")); |
| |
| Message m1 = replyToCons.receive(getReceiveTimeout()); |
| assertNotNull("The reply to consumer should have received the messsage", m1); |
| } |
| |
| @Test |
| public void testAltExchangeInAddressString() throws Exception |
| { |
| String addr1 = |
| "ADDR:my-exchange/test; {create: always, node:{type: topic,x-declare:{alternate-exchange:'amq.fanout'}}}"; |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String altQueueAddr = |
| "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'amq.fanout'}] }}"; |
| MessageConsumer cons = session.createConsumer(session.createQueue(altQueueAddr)); |
| |
| MessageProducer prod = session.createProducer(session.createTopic(addr1)); |
| prod.send(session.createMessage()); |
| prod.close(); |
| assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message", |
| cons.receive(getReceiveTimeout())); |
| |
| String addr2 = |
| "ADDR:test-queue;{create:sender, delete: sender,node:{type:queue,x-declare:{alternate-exchange:'amq.fanout'}}}"; |
| prod = session.createProducer(session.createTopic(addr2)); |
| prod.send(session.createMessage()); |
| prod.close(); |
| assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message", |
| cons.receive(getReceiveTimeout())); |
| cons.close(); |
| } |
| |
| @Test |
| public void testUnknownAltExchange() throws Exception |
| { |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String altQueueAddr = |
| "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'doesnotexist'}] }}"; |
| try |
| { |
| session.createConsumer(session.createQueue(altQueueAddr)); |
| fail("Attempt to create a queue with an unknown alternate exchange should fail"); |
| } |
| catch (JMSException e) |
| { |
| assertEquals("Failure code is not as expected", "404", e.getErrorCode()); |
| } |
| } |
| |
| @Test |
| public void testUnknownNodeDetectedByProducerAndConsumer() throws Exception |
| { |
| Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("ADDR: unknown-node"); |
| String expectedMessage = "The name 'unknown-node' supplied in the " |
| + "address doesn't resolve to an exchange or a queue"; |
| try |
| { |
| session.createConsumer(queue); |
| fail("Attempt to create a consumer with unknown node name should fail"); |
| } |
| catch (JMSException e) |
| { |
| Throwable cause = e.getCause(); |
| assertTrue("Unexpected exception cause. Expecting : " + QpidException.class |
| + " got : " + cause.getClass(), cause instanceof QpidException); |
| assertTrue("Unexpected exception message : " + cause.getMessage(), |
| cause.getMessage().contains(expectedMessage)); |
| } |
| |
| try |
| { |
| session.createProducer(queue); |
| fail("Attempt to create a producer with unknown node name should fail"); |
| } |
| catch (JMSException e) |
| { |
| Throwable cause = e.getCause(); |
| assertTrue("Unexpected exception cause. Expecting : " + QpidException.class |
| + " got : " + cause.getClass(), cause instanceof QpidException); |
| assertTrue("Unexpected exception message : " + cause.getMessage(), |
| cause.getMessage().contains(expectedMessage)); |
| } |
| } |
| |
| @Test |
| public void testUnknownExchangeType() throws Exception |
| { |
| createExchangeImpl(false, false, true); |
| } |
| |
| @Test |
| public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception |
| { |
| assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testQueueBrowserWithSelectorClientAcknowledgement() throws Exception |
| { |
| assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testQueueBrowserWithSelectorTransactedSession() throws Exception |
| { |
| assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED); |
| } |
| |
| @Test |
| public void testConsumerWithSelectorAutoAcknowledgement() throws Exception |
| { |
| assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testConsumerWithSelectorClientAcknowldgement() throws Exception |
| { |
| assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testConsumerWithSelectorTransactedSession() throws Exception |
| { |
| assertConsumerWithSelector(Session.SESSION_TRANSACTED); |
| } |
| |
| private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception |
| { |
| String queueAddress = "ADDR:" + getTestName() + ";{create: always}"; |
| |
| boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; |
| Session session = _connection.createSession(transacted, acknowledgement); |
| |
| Queue queue = session.createQueue(queueAddress); |
| |
| final int numberOfMessages = 10; |
| List<Message> sentMessages = Utils.sendMessages(session, queue, numberOfMessages); |
| assertNotNull("Messages were not sent", sentMessages); |
| assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); |
| |
| QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0"); |
| _connection.start(); |
| |
| Enumeration<Message> enumeration = browser.getEnumeration(); |
| |
| int counter = 0; |
| int expectedIndex = 0; |
| while (enumeration.hasMoreElements()) |
| { |
| Message m = enumeration.nextElement(); |
| assertNotNull("Expected not null message at step " + counter, m); |
| int messageIndex = m.getIntProperty(INDEX); |
| assertEquals("Unexpected index", expectedIndex, messageIndex); |
| expectedIndex += 2; |
| counter++; |
| } |
| assertEquals("Unexpected number of messsages received", 5, counter); |
| } |
| |
| private void assertConsumerWithSelector(int acknowledgement) throws Exception |
| { |
| String queueAddress = "ADDR:" + getTestName() + ";{create: always}"; |
| |
| boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; |
| Session session = _connection.createSession(transacted, acknowledgement); |
| |
| Queue queue = session.createQueue(queueAddress); |
| |
| final int numberOfMessages = 10; |
| Utils.sendMessages(session, queue, numberOfMessages); |
| |
| MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0"); |
| |
| int expectedIndex = 0; |
| for (int i = 0; i < 5; i++) |
| { |
| Message m = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Expected not null message at step " + i, m); |
| int messageIndex = m.getIntProperty(INDEX); |
| assertEquals("Unexpected index", expectedIndex, messageIndex); |
| expectedIndex += 2; |
| |
| if (transacted) |
| { |
| session.commit(); |
| } |
| else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE) |
| { |
| m.acknowledge(); |
| } |
| } |
| |
| Message m = consumer.receive(getReceiveTimeout()); |
| assertNull("Unexpected message received", m); |
| } |
| |
| /** |
| * Tests that a client using a session in {@link Session#CLIENT_ACKNOWLEDGE} can correctly |
| * recover a session and re-receive the same message. |
| */ |
| @Test |
| public void testTopicRereceiveAfterRecover() throws Exception |
| { |
| final Session jmsSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); |
| |
| final MessageProducer prod = jmsSession.createProducer(topic); |
| final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); |
| final Message sentMessage = jmsSession.createTextMessage("Hello"); |
| |
| prod.send(sentMessage); |
| Message receivedMessage = consForTopic1.receive(getReceiveTimeout()); |
| assertNotNull("message should be received by consumer", receivedMessage); |
| |
| jmsSession.recover(); |
| receivedMessage = consForTopic1.receive(getReceiveTimeout()); |
| assertNotNull("message should be re-received by consumer after recover", receivedMessage); |
| receivedMessage.acknowledge(); |
| } |
| |
| /** |
| * Tests that a client using a session in {@link Session#SESSION_TRANSACTED} can correctly |
| * rollback a session and re-receive the same message. |
| */ |
| @Test |
| public void testTopicRereceiveAfterRollback() throws Exception |
| { |
| final Session jmsSession = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); |
| |
| final MessageProducer prod = jmsSession.createProducer(topic); |
| final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); |
| final Message sentMessage = jmsSession.createTextMessage("Hello"); |
| |
| prod.send(sentMessage); |
| jmsSession.commit(); |
| |
| Message receivedMessage = consForTopic1.receive(getReceiveTimeout()); |
| assertNotNull("message should be received by consumer", receivedMessage); |
| |
| jmsSession.rollback(); |
| receivedMessage = consForTopic1.receive(getReceiveTimeout()); |
| assertNotNull("message should be re-received by consumer after rollback", receivedMessage); |
| jmsSession.commit(); |
| } |
| |
| /** |
| * Test Goals : |
| * <p> |
| * 1. Verify that link bindings are created and destroyed after creating and closing a subscriber. |
| * 2. Verify that link bindings are created and destroyed after creating and closing a subscriber. |
| */ |
| @Test |
| public void testLinkBindingBehavior() throws Exception |
| { |
| Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String addr = "ADDR:my-queue; {create: always, " + |
| "link: " + |
| "{" + |
| "x-bindings: [{exchange : 'amq.direct', key : test}]," + |
| "}" + |
| "}"; |
| |
| AMQDestination dest = (AMQDestination) jmsSession.createQueue(addr); |
| MessageConsumer cons = jmsSession.createConsumer(dest); |
| AMQSession ssn = (AMQSession) jmsSession; |
| |
| assertTrue("Queue not created as expected", ssn.isQueueExist(dest, true)); |
| assertTrue("Queue not bound as expected", ssn.isQueueBound("amq.direct", "my-queue", "test", null)); |
| |
| cons.close(); // closing consumer, link binding should be removed now. |
| assertTrue("Queue should still be there", ssn.isQueueExist(dest, true)); |
| assertFalse("Binding should not exist anymore", ssn.isQueueBound("amq.direct", "my-queue", "test", null)); |
| |
| MessageProducer prod = jmsSession.createProducer(dest); |
| assertTrue("Queue not bound as expected", ssn.isQueueBound("amq.direct", "my-queue", "test", null)); |
| prod.close(); |
| assertFalse("Binding should not exist anymore", ssn.isQueueBound("amq.direct", "my-queue", "test", null)); |
| } |
| |
| /** |
| * Test Goals : Verifies that the subscription queue created is as specified under link properties. |
| */ |
| @Test |
| public void testCustomizingSubscriptionQueue() throws Exception |
| { |
| Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," + |
| "alternate-exchange: 'amq.fanout'," + |
| "arguments: {'qpid.alert_size': 1000,'qpid.alert_count': 100}" + |
| "}"; |
| |
| String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}"; |
| Destination dest = ssn.createTopic(addr); |
| MessageConsumer cons = ssn.createConsumer(dest); |
| |
| String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; |
| AMQDestination verifyDest = (AMQDestination) ssn.createQueue(verifyAddr); |
| ((AMQSession) ssn).isQueueExist(verifyDest, true); |
| |
| // Verify that the producer does not delete the subscription queue. |
| MessageProducer prod = ssn.createProducer(dest); |
| prod.close(); |
| ((AMQSession) ssn).isQueueExist(verifyDest, true); |
| } |
| |
| /** |
| * QPID-7692: Receiving a message sent using a destination which has no subject fails when client is in BURL dest syntax mode. |
| */ |
| @Test |
| public void testJMSDestination_DestinationWithoutSubject() throws Exception |
| { |
| assumeThat("Only supported by AMQP 0-10", getProtocol(), is(equalTo("0-10"))); |
| |
| _connection.close(); |
| |
| doTestJMSDestiation_DestinationWithoutSubject("BURL"); |
| doTestJMSDestiation_DestinationWithoutSubject("ADDR"); |
| } |
| |
| private void doTestJMSDestiation_DestinationWithoutSubject(final String clientDestSyntaxDefault) throws Exception |
| { |
| // The client uses the default destination syntax when processing the destination on received messages |
| System.setProperty(ClientProperties.DEST_SYNTAX, clientDestSyntaxDefault); |
| try |
| { |
| |
| String exchange = "myfanout"; |
| Connection connection = getConnection(); |
| connection.start(); |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| String publisher = |
| String.format("ADDR:%s; {create: always, node: {type: topic, x-declare: { type: fanout }}}", |
| exchange); |
| String subscriber = String.format( |
| "ADDR:sub1; {create: always, node : {type : queue, x-bindings: [{ exchange: '%s', key: sub1 }]}}", |
| exchange); |
| |
| Destination pubDest = session.createQueue(publisher); |
| Destination subDest = session.createQueue(subscriber); |
| |
| MessageProducer producer = session.createProducer(pubDest); |
| MessageConsumer consumer = session.createConsumer(subDest); |
| |
| Message m = session.createMessage(); |
| producer.send(m); |
| session.commit(); |
| |
| Message receivedMessage = consumer.receive(getReceiveTimeout()); |
| assertNotNull("Message did not arrive", receivedMessage); |
| AMQDestination jmsDestination = (AMQDestination) receivedMessage.getJMSDestination(); |
| assertNotNull("Received message should have JMSDestination", jmsDestination); |
| |
| assertEquals("Unexpected exchange within JMSDestination", exchange, jmsDestination.getExchangeName()); |
| assertNull("Unexpected subject within JMSDestination", jmsDestination.getSubject()); |
| session.commit(); |
| connection.close(); |
| } |
| finally |
| { |
| System.clearProperty(ClientProperties.DEST_SYNTAX); |
| } |
| } |
| |
| private boolean isBroker010() |
| { |
| return "0-10".equals(getProtocol()); |
| } |
| |
| private boolean isCppBroker() |
| { |
| return getBrokerAdmin().getBrokerType() == BrokerAdmin.BrokerType.CPP; |
| } |
| } |