| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq; |
| |
| import javax.jms.Connection; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.spring.SpringConsumer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * |
| */ |
| public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ZeroPrefetchConsumerTest.class); |
| |
| protected Connection connection; |
| protected Queue queue; |
| |
| public void testCannotUseMessageListener() throws Exception { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| MessageListener listener = new SpringConsumer(); |
| try { |
| consumer.setMessageListener(listener); |
| fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); |
| } catch (JMSException e) { |
| LOG.info("Received expected exception : " + e); |
| } |
| } |
| |
| public void testPullConsumerWorks() throws Exception { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Hello World!")); |
| |
| // now lets receive it |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message answer = consumer.receive(5000); |
| assertNotNull("Should have received a message!", answer); |
| // check if method will return at all and will return a null |
| answer = consumer.receive(1); |
| assertNull("Should have not received a message!", answer); |
| answer = consumer.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| public void testIdleConsumer() throws Exception { |
| doTestIdleConsumer(false); |
| } |
| |
| public void testIdleConsumerTranscated() throws Exception { |
| doTestIdleConsumer(true); |
| } |
| |
| private void doTestIdleConsumer(boolean transacted) throws Exception { |
| Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Msg1")); |
| producer.send(session.createTextMessage("Msg2")); |
| if (transacted) { |
| session.commit(); |
| } |
| // now lets receive it |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| session.createConsumer(queue); |
| TextMessage answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg1"); |
| if (transacted) { |
| session.commit(); |
| } |
| // this call would return null if prefetchSize > 0 |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg2"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| public void testRecvRecvCommit() throws Exception { |
| doTestRecvRecvCommit(false); |
| } |
| |
| public void testRecvRecvCommitTranscated() throws Exception { |
| doTestRecvRecvCommit(true); |
| } |
| |
| private void doTestRecvRecvCommit(boolean transacted) throws Exception { |
| Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Msg1")); |
| producer.send(session.createTextMessage("Msg2")); |
| if (transacted) { |
| session.commit(); |
| } |
| // now lets receive it |
| MessageConsumer consumer = session.createConsumer(queue); |
| TextMessage answer = (TextMessage)consumer.receiveNoWait(); |
| assertEquals("Should have received a message!", answer.getText(), "Msg1"); |
| answer = (TextMessage)consumer.receiveNoWait(); |
| assertEquals("Should have received a message!", answer.getText(), "Msg2"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| public void testTwoConsumers() throws Exception { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Msg1")); |
| producer.send(session.createTextMessage("Msg2")); |
| |
| // now lets receive it |
| MessageConsumer consumer1 = session.createConsumer(queue); |
| MessageConsumer consumer2 = session.createConsumer(queue); |
| TextMessage answer = (TextMessage)consumer1.receiveNoWait(); |
| assertEquals("Should have received a message!", answer.getText(), "Msg1"); |
| answer = (TextMessage)consumer2.receiveNoWait(); |
| assertEquals("Should have received a message!", answer.getText(), "Msg2"); |
| |
| answer = (TextMessage)consumer2.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| // https://issues.apache.org/activemq/browse/AMQ-2567 |
| public void testManyMessageConsumer() throws Exception { |
| doTestManyMessageConsumer(true); |
| } |
| |
| public void testManyMessageConsumerNoTransaction() throws Exception { |
| doTestManyMessageConsumer(false); |
| } |
| |
| private void doTestManyMessageConsumer(boolean transacted) throws Exception { |
| Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Msg1")); |
| producer.send(session.createTextMessage("Msg2")); |
| producer.send(session.createTextMessage("Msg3")); |
| producer.send(session.createTextMessage("Msg4")); |
| producer.send(session.createTextMessage("Msg5")); |
| producer.send(session.createTextMessage("Msg6")); |
| producer.send(session.createTextMessage("Msg7")); |
| producer.send(session.createTextMessage("Msg8")); |
| if (transacted) { |
| session.commit(); |
| } |
| // now lets receive it |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| MessageConsumer consumer2 = session.createConsumer(queue); |
| TextMessage answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg1"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg2"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg3"); |
| if (transacted) { |
| session.commit(); |
| } |
| // this call would return null if prefetchSize > 0 |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg4"); |
| if (transacted) { |
| session.commit(); |
| } |
| // Now using other consumer |
| // this call should return the next message (Msg5) still left on the queue |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg5"); |
| if (transacted) { |
| session.commit(); |
| } |
| // Now using other consumer |
| // this call should return the next message (Msg5) still left on the queue |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg6"); |
| // read one more message without commit |
| // Now using other consumer |
| // this call should return the next message (Msg5) still left on the queue |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg7"); |
| if (transacted) { |
| session.commit(); |
| } |
| // Now using other consumer |
| // this call should return the next message (Msg5) still left on the queue |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg8"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| public void testManyMessageConsumerWithSend() throws Exception { |
| doTestManyMessageConsumerWithSend(true); |
| } |
| |
| public void testManyMessageConsumerWithSendNoTransaction() throws Exception { |
| doTestManyMessageConsumerWithSend(false); |
| } |
| |
| private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception { |
| Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(session.createTextMessage("Msg1")); |
| producer.send(session.createTextMessage("Msg2")); |
| producer.send(session.createTextMessage("Msg3")); |
| producer.send(session.createTextMessage("Msg4")); |
| producer.send(session.createTextMessage("Msg5")); |
| producer.send(session.createTextMessage("Msg6")); |
| producer.send(session.createTextMessage("Msg7")); |
| producer.send(session.createTextMessage("Msg8")); |
| if (transacted) { |
| session.commit(); |
| } |
| // now lets receive it |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| MessageConsumer consumer2 = session.createConsumer(queue); |
| TextMessage answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg1"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg2"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg3"); |
| if (transacted) { |
| session.commit(); |
| } |
| // Now using other consumer take 2 |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg4"); |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg5"); |
| |
| // ensure prefetch extension ok by sending another that could get dispatched |
| producer.send(session.createTextMessage("Msg9")); |
| if (transacted) { |
| session.commit(); |
| } |
| |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg6"); |
| // read one more message without commit |
| // and using other consumer |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg7"); |
| if (transacted) { |
| session.commit(); |
| } |
| |
| answer = (TextMessage)consumer2.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg8"); |
| if (transacted) { |
| session.commit(); |
| } |
| |
| answer = (TextMessage)consumer.receive(5000); |
| assertEquals("Should have received a message!", answer.getText(), "Msg9"); |
| if (transacted) { |
| session.commit(); |
| } |
| answer = (TextMessage)consumer.receiveNoWait(); |
| assertNull("Should have not received a message!", answer); |
| } |
| |
| protected void setUp() throws Exception { |
| bindAddress = "tcp://localhost:0"; |
| super.setUp(); |
| |
| connection = createConnection(); |
| connection.start(); |
| queue = createQueue(); |
| } |
| |
| protected void startBroker() throws Exception { |
| super.startBroker(); |
| bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString(); |
| } |
| |
| protected void tearDown() throws Exception { |
| connection.close(); |
| super.tearDown(); |
| } |
| |
| protected Queue createQueue() { |
| return new ActiveMQQueue(getDestinationString() + "?consumer.prefetchSize=0"); |
| } |
| |
| } |