| /** | 
 |  * Licensed to the Apache Software Foundation (ASF) under one or more | 
 |  * contributor license agreements.  See the NOTICE file distributed with | 
 |  * this work for additional information regarding copyright ownership. | 
 |  * The ASF licenses this file to You under the Apache License, Version 2.0 | 
 |  * (the "License"); you may not use this file except in compliance with | 
 |  * the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *      http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 | package org.apache.activemq; | 
 |  | 
 | import javax.jms.Connection; | 
 | import javax.jms.JMSException; | 
 | import javax.jms.Message; | 
 | import javax.jms.MessageConsumer; | 
 | import javax.jms.MessageListener; | 
 | import javax.jms.MessageProducer; | 
 | import javax.jms.Queue; | 
 | import javax.jms.Session; | 
 | import javax.jms.TextMessage; | 
 |  | 
 | import org.apache.activemq.broker.BrokerService; | 
 | import org.apache.activemq.broker.region.Subscription; | 
 | import org.apache.activemq.broker.region.policy.PolicyEntry; | 
 | import org.apache.activemq.broker.region.policy.PolicyMap; | 
 | import org.apache.activemq.command.ActiveMQDestination; | 
 | import org.apache.activemq.command.ActiveMQQueue; | 
 | import org.apache.activemq.command.ConsumerControl; | 
 | import org.apache.activemq.command.ExceptionResponse; | 
 | import org.apache.activemq.spring.SpringConsumer; | 
 | import org.slf4j.Logger; | 
 | import org.slf4j.LoggerFactory; | 
 |  | 
 | /** | 
 |  * | 
 |  */ | 
 | public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { | 
 |  | 
 |     private static final Logger LOG = LoggerFactory.getLogger(ZeroPrefetchConsumerTest.class); | 
 |  | 
 |     protected Connection connection; | 
 |     protected Queue queue; | 
 |     protected Queue brokerZeroQueue = new ActiveMQQueue("brokerZeroConfig"); | 
 |  | 
 |     public void testCannotUseMessageListener() throws Exception { | 
 |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |  | 
 |         MessageListener listener = new SpringConsumer(); | 
 |         try { | 
 |             consumer.setMessageListener(listener); | 
 |             fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); | 
 |         } catch (JMSException e) { | 
 |             LOG.info("Received expected exception : " + e); | 
 |         } | 
 |     } | 
 |  | 
 |     public void testPullConsumerWorks() throws Exception { | 
 |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Hello World!")); | 
 |  | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |         Message answer = consumer.receive(5000); | 
 |         assertNotNull("Should have received a message!", answer); | 
 |         // check if method will return at all and will return a null | 
 |         answer = consumer.receive(1); | 
 |         assertNull("Should have not received a message!", answer); | 
 |         answer = consumer.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     public void testIdleConsumer() throws Exception { | 
 |         doTestIdleConsumer(false); | 
 |     } | 
 |  | 
 |     public void testIdleConsumerTranscated() throws Exception { | 
 |         doTestIdleConsumer(true); | 
 |     } | 
 |  | 
 |     private void doTestIdleConsumer(boolean transacted) throws Exception { | 
 |         Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         producer.send(session.createTextMessage("Msg2")); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |  | 
 |         session.createConsumer(queue); | 
 |         TextMessage answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // this call would return null if prefetchSize > 0 | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg2"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     public void testRecvRecvCommit() throws Exception { | 
 |         doTestRecvRecvCommit(false); | 
 |     } | 
 |  | 
 |     public void testRecvRecvCommitTranscated() throws Exception { | 
 |         doTestRecvRecvCommit(true); | 
 |     } | 
 |  | 
 |     private void doTestRecvRecvCommit(boolean transacted) throws Exception { | 
 |         Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         producer.send(session.createTextMessage("Msg2")); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |         TextMessage answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |         answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg2"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     public void testTwoConsumers() throws Exception { | 
 |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         producer.send(session.createTextMessage("Msg2")); | 
 |  | 
 |         // now lets receive it | 
 |         MessageConsumer consumer1 = session.createConsumer(queue); | 
 |         MessageConsumer consumer2 = session.createConsumer(queue); | 
 |         TextMessage answer = (TextMessage)consumer1.receiveNoWait(); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |         answer = (TextMessage)consumer2.receiveNoWait(); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg2"); | 
 |  | 
 |         answer = (TextMessage)consumer2.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     // https://issues.apache.org/activemq/browse/AMQ-2567 | 
 |     public void testManyMessageConsumer() throws Exception { | 
 |         doTestManyMessageConsumer(true); | 
 |     } | 
 |  | 
 |     public void testManyMessageConsumerNoTransaction() throws Exception { | 
 |         doTestManyMessageConsumer(false); | 
 |     } | 
 |  | 
 |     private void doTestManyMessageConsumer(boolean transacted) throws Exception { | 
 |         Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         producer.send(session.createTextMessage("Msg2")); | 
 |         producer.send(session.createTextMessage("Msg3")); | 
 |         producer.send(session.createTextMessage("Msg4")); | 
 |         producer.send(session.createTextMessage("Msg5")); | 
 |         producer.send(session.createTextMessage("Msg6")); | 
 |         producer.send(session.createTextMessage("Msg7")); | 
 |         producer.send(session.createTextMessage("Msg8")); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |  | 
 |         MessageConsumer consumer2  = session.createConsumer(queue); | 
 |         TextMessage answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg2"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg3"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // this call would return null if prefetchSize > 0 | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg4"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // Now using other consumer | 
 |         // this call should return the next message (Msg5) still left on the queue | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg5"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // Now using other consumer | 
 |         // this call should return the next message still left on the queue | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg6"); | 
 |         // read one more message without commit | 
 |         // this call should return the next message still left on the queue | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg7"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // Now using other consumer | 
 |         // this call should return the next message (Msg5) still left on the queue | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg8"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     public void testManyMessageConsumerWithSend() throws Exception { | 
 |         doTestManyMessageConsumerWithSend(true); | 
 |     } | 
 |  | 
 |     public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception { | 
 |         ((ActiveMQConnection)connection).setMessagePrioritySupported(true); | 
 |         doTestManyMessageConsumerWithSend(true); | 
 |     } | 
 |  | 
 |     public void testManyMessageConsumerWithSendNoTransaction() throws Exception { | 
 |         doTestManyMessageConsumerWithSend(false); | 
 |     } | 
 |  | 
 |     private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception { | 
 |         Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(queue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         producer.send(session.createTextMessage("Msg2")); | 
 |         producer.send(session.createTextMessage("Msg3")); | 
 |         producer.send(session.createTextMessage("Msg4")); | 
 |         producer.send(session.createTextMessage("Msg5")); | 
 |         producer.send(session.createTextMessage("Msg6")); | 
 |         producer.send(session.createTextMessage("Msg7")); | 
 |         producer.send(session.createTextMessage("Msg8")); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(queue); | 
 |  | 
 |         MessageConsumer consumer2  = session.createConsumer(queue); | 
 |         TextMessage answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg2"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg3"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         // Now using other consumer take 2 | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg4"); | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg5"); | 
 |  | 
 |         // ensure prefetch extension ok by sending another that could get dispatched | 
 |         producer.send(session.createTextMessage("Msg9")); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |  | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg6"); | 
 |         // read one more message without commit | 
 |         // and using other consumer | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg7"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |  | 
 |         answer = (TextMessage)consumer2.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg8"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |  | 
 |         answer = (TextMessage)consumer.receive(5000); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg9"); | 
 |         if (transacted) { | 
 |             session.commit(); | 
 |         } | 
 |         answer = (TextMessage)consumer.receiveNoWait(); | 
 |         assertNull("Should have not received a message!", answer); | 
 |     } | 
 |  | 
 |     // https://issues.apache.org/jira/browse/AMQ-4224 | 
 |     public void testBrokerZeroPrefetchConfig() throws Exception { | 
 |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         MessageProducer producer = session.createProducer(brokerZeroQueue); | 
 |         producer.send(session.createTextMessage("Msg1")); | 
 |         // now lets receive it | 
 |         MessageConsumer consumer = session.createConsumer(brokerZeroQueue); | 
 |  | 
 |         TextMessage answer = (TextMessage)consumer.receive(5000); | 
 |         assertNotNull("Consumer should have read a message", answer); | 
 |         assertEquals("Should have received a message!", answer.getText(), "Msg1"); | 
 |     } | 
 |  | 
 |     // https://issues.apache.org/jira/browse/AMQ-4234 | 
 |     // https://issues.apache.org/jira/browse/AMQ-4235 | 
 |     public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception { | 
 |         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | 
 |  | 
 |         ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue); | 
 |         assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); | 
 |  | 
 |         // verify sub view broker | 
 |         Subscription sub = | 
 |                 broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); | 
 |         assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); | 
 |  | 
 |         // manipulate Prefetch (like failover and stomp) | 
 |         ConsumerControl consumerControl = new ConsumerControl(); | 
 |         consumerControl.setConsumerId(consumer.info.getConsumerId()); | 
 |         consumerControl.setDestination(ActiveMQDestination.transform(brokerZeroQueue)); | 
 |         consumerControl.setPrefetch(1000); // default for a q | 
 |  | 
 |         Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl); | 
 |         assertTrue("good request", !(reply instanceof ExceptionResponse)); | 
 |         assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); | 
 |         assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); | 
 |     } | 
 |  | 
 |     @Override | 
 |     protected BrokerService createBroker() throws Exception { | 
 |         BrokerService brokerService = super.createBroker(); | 
 |         PolicyMap policyMap = new PolicyMap(); | 
 |         PolicyEntry zeroPrefetchPolicy = new PolicyEntry(); | 
 |         zeroPrefetchPolicy.setQueuePrefetch(0); | 
 |         policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy); | 
 |         brokerService.setDestinationPolicy(policyMap); | 
 |         return brokerService; | 
 |     } | 
 |  | 
 |     @Override | 
 |     protected void setUp() throws Exception { | 
 |         bindAddress = "tcp://localhost:0"; | 
 |         super.setUp(); | 
 |  | 
 |         connection = createConnection(); | 
 |         connection.start(); | 
 |         queue = createQueue(); | 
 |     } | 
 |  | 
 |     @Override | 
 |     protected void startBroker() throws Exception { | 
 |         super.startBroker(); | 
 |         bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString(); | 
 |     } | 
 |  | 
 |     @Override | 
 |     protected void tearDown() throws Exception { | 
 |         try { | 
 |             connection.close(); | 
 |         } catch (Exception ex) {} | 
 |  | 
 |         super.tearDown(); | 
 |     } | 
 |  | 
 |     protected Queue createQueue() { | 
 |         return new ActiveMQQueue(getDestinationString() + "?consumer.prefetchSize=0"); | 
 |     } | 
 | } |