| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq; |
| |
| import java.util.ArrayList; |
| import java.util.Enumeration; |
| |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.QueueBrowser; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| |
| import org.apache.activemq.test.JmsResourceProvider; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * |
| */ |
| public class JmsQueueTransactionTest extends JmsTransactionTestSupport { |
| private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class); |
| |
| /** |
| * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider() |
| */ |
| protected JmsResourceProvider getJmsResourceProvider() { |
| JmsResourceProvider p = new JmsResourceProvider(); |
| p.setTopic(false); |
| return p; |
| } |
| |
| /** |
| * Tests if the the connection gets reset, the messages will still be |
| * received. |
| * |
| * @throws Exception |
| */ |
| public void testReceiveTwoThenCloseConnection() throws Exception { |
| Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; |
| |
| // lets consume any outstanding messages from previous test runs |
| beginTx(); |
| while (consumer.receive(1000) != null) { |
| } |
| commitTx(); |
| |
| beginTx(); |
| producer.send(outbound[0]); |
| producer.send(outbound[1]); |
| commitTx(); |
| |
| LOG.info("Sent 0: " + outbound[0]); |
| LOG.info("Sent 1: " + outbound[1]); |
| |
| ArrayList<Message> messages = new ArrayList<Message>(); |
| beginTx(); |
| Message message = consumer.receive(2000); |
| assertEquals(outbound[0], message); |
| |
| message = consumer.receive(2000); |
| assertNotNull(message); |
| assertEquals(outbound[1], message); |
| |
| // Close and reopen connection. |
| reconnect(); |
| |
| // Consume again.. the previous message should |
| // get redelivered. |
| beginTx(); |
| message = consumer.receive(2000); |
| assertNotNull("Should have re-received the first message again!", message); |
| messages.add(message); |
| assertEquals(outbound[0], message); |
| |
| message = consumer.receive(5000); |
| assertNotNull("Should have re-received the second message again!", message); |
| messages.add(message); |
| assertEquals(outbound[1], message); |
| commitTx(); |
| |
| Message inbound[] = new Message[messages.size()]; |
| messages.toArray(inbound); |
| |
| assertTextMessagesEqual("Rollback did not work", outbound, inbound); |
| } |
| |
| /** |
| * Tests sending and receiving messages with two sessions(one for producing |
| * and another for consuming). |
| * |
| * @throws Exception |
| */ |
| public void testSendReceiveInSeperateSessionTest() throws Exception { |
| session.close(); |
| int batchCount = 10; |
| |
| for (int i = 0; i < batchCount; i++) { |
| // Session that sends messages |
| { |
| Session session = resourceProvider.createSession(connection); |
| this.session = session; |
| MessageProducer producer = resourceProvider.createProducer(session, destination); |
| // consumer = resourceProvider.createConsumer(session, |
| // destination); |
| beginTx(); |
| producer.send(session.createTextMessage("Test Message: " + i)); |
| commitTx(); |
| session.close(); |
| } |
| |
| // Session that consumes messages |
| { |
| Session session = resourceProvider.createSession(connection); |
| this.session = session; |
| MessageConsumer consumer = resourceProvider.createConsumer(session, destination); |
| |
| beginTx(); |
| TextMessage message = (TextMessage)consumer.receive(1000 * 5); |
| assertNotNull("Received only " + i + " messages in batch ", message); |
| assertEquals("Test Message: " + i, message.getText()); |
| |
| commitTx(); |
| session.close(); |
| } |
| } |
| } |
| |
| /** |
| * Tests the queue browser. Browses the messages then the consumer tries to |
| * receive them. The messages should still be in the queue even when it was |
| * browsed. |
| * |
| * @throws Exception |
| */ |
| public void testReceiveBrowseReceive() throws Exception { |
| Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")}; |
| |
| // lets consume any outstanding messages from previous test runs |
| beginTx(); |
| while (consumer.receive(1000) != null) { |
| } |
| commitTx(); |
| |
| beginTx(); |
| producer.send(outbound[0]); |
| producer.send(outbound[1]); |
| producer.send(outbound[2]); |
| commitTx(); |
| |
| // Get the first. |
| beginTx(); |
| assertEquals(outbound[0], consumer.receive(1000)); |
| consumer.close(); |
| commitTx(); |
| |
| beginTx(); |
| QueueBrowser browser = session.createBrowser((Queue)destination); |
| Enumeration enumeration = browser.getEnumeration(); |
| |
| // browse the second |
| assertTrue("should have received the second message", enumeration.hasMoreElements()); |
| assertEquals(outbound[1], (Message)enumeration.nextElement()); |
| |
| // browse the third. |
| assertTrue("Should have received the third message", enumeration.hasMoreElements()); |
| assertEquals(outbound[2], (Message)enumeration.nextElement()); |
| |
| // There should be no more. |
| boolean tooMany = false; |
| while (enumeration.hasMoreElements()) { |
| LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText()); |
| tooMany = true; |
| } |
| assertFalse(tooMany); |
| browser.close(); |
| |
| // Re-open the consumer. |
| consumer = resourceProvider.createConsumer(session, destination); |
| // Receive the second. |
| assertEquals(outbound[1], consumer.receive(1000)); |
| // Receive the third. |
| assertEquals(outbound[2], consumer.receive(1000)); |
| consumer.close(); |
| |
| commitTx(); |
| } |
| |
| } |