| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.jms.DeliveryMode; |
| |
| import junit.framework.Test; |
| |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.ConnectionInfo; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.LocalTransactionId; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.command.RemoveInfo; |
| import org.apache.activemq.command.SessionInfo; |
| |
| public class BrokerTest extends BrokerTestSupport { |
| |
| public ActiveMQDestination destination; |
| public int deliveryMode; |
| public int prefetch; |
| public byte destinationType; |
| public boolean durableConsumer; |
| protected static final int MAX_NULL_WAIT=500; |
| |
| public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(1); |
| connection1.request(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(1); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.request(consumerInfo2); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection1); |
| Message m2 = receiveMessage(connection2); |
| |
| assertNotNull("m1 is null for index: " + i, m1); |
| assertNotNull("m2 is null for index: " + i, m2); |
| |
| assertNotSame(m1.getMessageId(), m2.getMessageId()); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void initCombosForTestQueueBrowserWith2Consumers() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testQueueBrowserWith2Consumers() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(10); |
| connection1.request(consumerInfo1); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| //as the messages are sent async - need to synchronize the last |
| //one to ensure they arrive in the order we want |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Setup a second connection with a queue browser. |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(1); |
| consumerInfo2.setBrowser(true); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.request(consumerInfo2); |
| |
| List<Message> messages = new ArrayList<Message>(); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull("m1 is null for index: " + i, m1); |
| messages.add(m1); |
| } |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = messages.get(i); |
| Message m2 = receiveMessage(connection2); |
| assertNotNull("m2 is null for index: " + i, m2); |
| assertEquals(m1.getMessageId(), m2.getMessageId()); |
| connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| assertNoMessagesLeft(connection2); |
| } |
| |
| |
| /* |
| * change the order of the above test |
| */ |
| public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| deliveryMode = DeliveryMode.NON_PERSISTENT; |
| |
| |
| // Setup a second connection with a queue browser. |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(10); |
| consumerInfo2.setBrowser(true); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.request(consumerInfo2); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(10); |
| connection1.request(consumerInfo1); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| //as the messages are sent async - need to synchronize the last |
| //one to ensure they arrive in the order we want |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| |
| List<Message> messages = new ArrayList<Message>(); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull("m1 is null for index: " + i, m1); |
| messages.add(m1); |
| } |
| |
| // no messages present in queue browser as there were no messages when it |
| // was created |
| assertNoMessagesLeft(connection1); |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void testQueueBrowserWith2ConsumersInterleaved() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| deliveryMode = DeliveryMode.NON_PERSISTENT; |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(10); |
| connection1.request(consumerInfo1); |
| |
| // Send the messages |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Setup a second connection with a queue browser. |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(1); |
| consumerInfo2.setBrowser(true); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.request(consumerInfo2); |
| |
| |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| //as the messages are sent async - need to synchronize the last |
| //one to ensure they arrive in the order we want |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| |
| List<Message> messages = new ArrayList<Message>(); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull("m1 is null for index: " + i, m1); |
| messages.add(m1); |
| } |
| |
| // a browse is a snapshot - only guarantee to see messages produced before |
| // the browser |
| for (int i = 0; i < 1; i++) { |
| Message m1 = messages.get(i); |
| Message m2 = receiveMessage(connection2); |
| assertNotNull("m2 is null for index: " + i, m2); |
| assertEquals(m1.getMessageId(), m2.getMessageId()); |
| connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| |
| connection1.request(closeConnectionInfo(connectionInfo1)); |
| connection1.stop(); |
| connection2.request(closeConnectionInfo(connectionInfo2)); |
| connection2.stop(); |
| } |
| |
| |
| public void initCombosForTestConsumerPrefetchAndStandardAck() { |
| addCombinationValues("deliveryMode", new Object[] { |
| // Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testConsumerPrefetchAndStandardAck() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setPrefetchSize(1); |
| connection.send(consumerInfo); |
| |
| // Send 3 messages to the broker. |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Make sure only 1 message was delivered. |
| Message m1 = receiveMessage(connection); |
| assertNotNull(m1); |
| assertNoMessagesLeft(connection); |
| |
| // Acknowledge the first message. This should cause the next message to |
| // get dispatched. |
| connection.send(createAck(consumerInfo, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| |
| Message m2 = receiveMessage(connection); |
| assertNotNull(m2); |
| connection.send(createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE)); |
| |
| Message m3 = receiveMessage(connection); |
| assertNotNull(m3); |
| connection.send(createAck(consumerInfo, m3, 1, MessageAck.STANDARD_ACK_TYPE)); |
| |
| connection.send(closeConnectionInfo(connectionInfo)); |
| } |
| |
| public void initCombosForTestTransactedAckWithPrefetchOfOne() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testTransactedAckWithPrefetchOfOne() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| destination = createDestinationInfo(connection1, connectionInfo1, destinationType); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(1); |
| connection1.send(consumerInfo1); |
| |
| // Send the messages |
| for (int i = 0; i < 4; i++) { |
| Message message = createMessage(producerInfo1, destination, deliveryMode); |
| connection1.send(message); |
| } |
| |
| |
| |
| // Now get the messages. |
| for (int i = 0; i < 4; i++) { |
| // Begin the transaction. |
| LocalTransactionId txid = createLocalTransaction(sessionInfo1); |
| connection1.send(createBeginTransaction(connectionInfo1, txid)); |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE); |
| ack.setTransactionId(txid); |
| connection1.send(ack); |
| // Commit the transaction. |
| connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); |
| } |
| assertNoMessagesLeft(connection1); |
| } |
| |
| public void initCombosForTestTransactedSend() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testTransactedSend() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| destination = createDestinationInfo(connection1, connectionInfo1, destinationType); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // Begin the transaction. |
| LocalTransactionId txid = createLocalTransaction(sessionInfo1); |
| connection1.send(createBeginTransaction(connectionInfo1, txid)); |
| |
| // Send the messages |
| for (int i = 0; i < 4; i++) { |
| Message message = createMessage(producerInfo1, destination, deliveryMode); |
| message.setTransactionId(txid); |
| connection1.request(message); |
| } |
| |
| // The point of this test is that message should not be delivered until |
| // send is committed. |
| assertNull(receiveMessage(connection1,MAX_NULL_WAIT)); |
| |
| // Commit the transaction. |
| connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); |
| |
| // Now get the messages. |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| } |
| |
| public void initCombosForTestQueueTransactedAck() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); |
| } |
| |
| public void testQueueTransactedAck() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| destination = createDestinationInfo(connection1, connectionInfo1, destinationType); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // Send the messages |
| for (int i = 0; i < 4; i++) { |
| Message message = createMessage(producerInfo1, destination, deliveryMode); |
| connection1.send(message); |
| } |
| |
| // Begin the transaction. |
| LocalTransactionId txid = createLocalTransaction(sessionInfo1); |
| connection1.send(createBeginTransaction(connectionInfo1, txid)); |
| |
| // Acknowledge the first 2 messages. |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull("m1 is null for index: " + i, m1); |
| MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE); |
| ack.setTransactionId(txid); |
| connection1.request(ack); |
| } |
| |
| // Commit the transaction. |
| connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); |
| |
| // The queue should now only have the remaining 2 messages |
| assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); |
| } |
| |
| public void testTopicDurableSubscriptionCanBeRestored() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQTopic("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| connectionInfo1.setClientId("clientid1"); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setSubscriptionName("test"); |
| connection1.send(consumerInfo1); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); |
| connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); |
| connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); |
| connection1.request(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); |
| |
| // Get the messages |
| Message m = null; |
| for (int i = 0; i < 2; i++) { |
| m = receiveMessage(connection1); |
| assertNotNull(m); |
| } |
| // Ack the last message. |
| connection1.send(createAck(consumerInfo1, m, 2, MessageAck.STANDARD_ACK_TYPE)); |
| // Close the connection. |
| connection1.request(closeConnectionInfo(connectionInfo1)); |
| connection1.stop(); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| connectionInfo2.setClientId("clientid1"); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(100); |
| consumerInfo2.setSubscriptionName("test"); |
| |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(consumerInfo2); |
| |
| // Get the rest of the messages |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection2); |
| assertNotNull("m1 is null for index: " + i, m1); |
| } |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(1); |
| connection1.send(consumerInfo1); |
| |
| // Send the messages. |
| for (int i = 0; i < 4; i++) { |
| Message message = createMessage(producerInfo, destination, deliveryMode); |
| message.setGroupID("TEST-GROUP"); |
| message.setGroupSequence(i + 1); |
| connection1.request(message); |
| } |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(1); |
| connection2.send(consumerInfo2); |
| |
| // All the messages should have been sent down connection 1.. just get |
| // the first 3 |
| for (int i = 0; i < 3; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull("m1 is null for index: " + i, m1); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| // Close the first consumer. |
| connection1.request(closeConsumerInfo(consumerInfo1)); |
| |
| // The last messages should now go the the second consumer. |
| for (int i = 0; i < 1; i++) { |
| Message m1 = receiveMessage(connection2); |
| assertNotNull("m1 is null for index: " + i, m1); |
| connection2.request(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE}); |
| } |
| |
| public void testTopicConsumerOnlySeeMessagesAfterCreation() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQTopic("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| connectionInfo1.setClientId("A"); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| // Send the 1st message |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| |
| // Create the durable subscription. |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| if (durableConsumer) { |
| consumerInfo1.setSubscriptionName("test"); |
| } |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| Message m = createMessage(producerInfo1, destination, deliveryMode); |
| connection1.send(m); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| |
| // Subscription should skip over the first message |
| Message m2 = receiveMessage(connection1); |
| assertNotNull(m2); |
| assertEquals(m.getMessageId(), m2.getMessageId()); |
| m2 = receiveMessage(connection1); |
| assertNotNull(m2); |
| |
| assertNoMessagesLeft(connection1); |
| } |
| |
| public void initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE}); |
| } |
| |
| public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQTopic("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| connectionInfo1.setClientId("A"); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| // Send the messages |
| Message m = createMessage(producerInfo1, destination, deliveryMode); |
| connection1.send(m); |
| |
| // Create the durable subscription. |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| if (durableConsumer) { |
| consumerInfo1.setSubscriptionName("test"); |
| } |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setRetroactive(true); |
| connection1.send(consumerInfo1); |
| |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.request(createMessage(producerInfo1, destination, deliveryMode)); |
| |
| // the behavior is VERY dependent on the recovery policy used. |
| // But the default broker settings try to make it as consistent as |
| // possible |
| |
| // Subscription should see all messages sent. |
| Message m2 = receiveMessage(connection1); |
| assertNotNull(m2); |
| assertEquals(m.getMessageId(), m2.getMessageId()); |
| for (int i = 0; i < 2; i++) { |
| m2 = receiveMessage(connection1); |
| assertNotNull(m2); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| } |
| |
| // |
| // TODO: need to reimplement this since we don't fail when we send to a |
| // non-existant |
| // destination. But if we can access the Region directly then we should be |
| // able to |
| // check that if the destination was removed. |
| // |
| // public void initCombosForTestTempDestinationsRemovedOnConnectionClose() { |
| // addCombinationValues( "deliveryMode", new Object[]{ |
| // Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| // Integer.valueOf(DeliveryMode.PERSISTENT)} ); |
| // addCombinationValues( "destinationType", new Object[]{ |
| // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); |
| // } |
| // |
| // public void testTempDestinationsRemovedOnConnectionClose() throws |
| // Exception { |
| // |
| // // Setup a first connection |
| // StubConnection connection1 = createConnection(); |
| // ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| // connection1.send(connectionInfo1); |
| // connection1.send(sessionInfo1); |
| // connection1.send(producerInfo1); |
| // |
| // destination = createDestinationInfo(connection1, connectionInfo1, |
| // destinationType); |
| // |
| // StubConnection connection2 = createConnection(); |
| // ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| // SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| // ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); |
| // connection2.send(connectionInfo2); |
| // connection2.send(sessionInfo2); |
| // connection2.send(producerInfo2); |
| // |
| // // Send from connection2 to connection1's temp destination. Should |
| // succeed. |
| // connection2.send(createMessage(producerInfo2, destination, |
| // deliveryMode)); |
| // |
| // // Close connection 1 |
| // connection1.request(closeConnectionInfo(connectionInfo1)); |
| // |
| // try { |
| // // Send from connection2 to connection1's temp destination. Should not |
| // succeed. |
| // connection2.request(createMessage(producerInfo2, destination, |
| // deliveryMode)); |
| // fail("Expected JMSException."); |
| // } catch ( JMSException success ) { |
| // } |
| // |
| // } |
| |
| // public void initCombosForTestTempDestinationsAreNotAutoCreated() { |
| // addCombinationValues( "deliveryMode", new Object[]{ |
| // Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| // Integer.valueOf(DeliveryMode.PERSISTENT)} ); |
| // addCombinationValues( "destinationType", new Object[]{ |
| // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); |
| // } |
| // |
| // |
| |
| // We create temp destination on demand now so this test case is no longer |
| // valid. |
| // |
| // public void testTempDestinationsAreNotAutoCreated() throws Exception { |
| // |
| // // Setup a first connection |
| // StubConnection connection1 = createConnection(); |
| // ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| // connection1.send(connectionInfo1); |
| // connection1.send(sessionInfo1); |
| // connection1.send(producerInfo1); |
| // |
| // destination = |
| // ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", |
| // destinationType); |
| // |
| // // Should not be able to send to a non-existant temp destination. |
| // try { |
| // connection1.request(createMessage(producerInfo1, destination, |
| // deliveryMode)); |
| // fail("Expected JMSException."); |
| // } catch ( JMSException success ) { |
| // } |
| // |
| // } |
| |
| |
| public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testExclusiveQueueDeliversToOnlyOneConsumer() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQQueue("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(1); |
| consumerInfo1.setExclusive(true); |
| connection1.send(consumerInfo1); |
| |
| // Send a message.. this should make consumer 1 the exclusive owner. |
| connection1.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setPrefetchSize(1); |
| consumerInfo2.setExclusive(true); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.request(consumerInfo2); |
| |
| // Second message should go to consumer 1 even though consumer 2 is |
| // ready |
| // for dispatch. |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Acknowledge the first 2 messages |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| // Close the first consumer. |
| connection1.send(closeConsumerInfo(consumerInfo1)); |
| |
| // The last two messages should now go the the second consumer. |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection2); |
| assertNotNull(m1); |
| connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void initCombosForTestWildcardConsume() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); |
| } |
| |
| public void testWildcardConsume() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| // setup the wildcard consumer. |
| ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("WILD.*.TEST", |
| destinationType); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // These two message should NOT match the wild card. |
| connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.CARD", |
| destinationType), |
| deliveryMode)); |
| connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.TEST", |
| destinationType), |
| deliveryMode)); |
| |
| // These two message should match the wild card. |
| ActiveMQDestination d1 = ActiveMQDestination.createDestination("WILD.CARD.TEST", destinationType); |
| connection1.send(createMessage(producerInfo1, d1, deliveryMode)); |
| |
| Message m = receiveMessage(connection1); |
| assertNotNull(m); |
| assertEquals(d1, m.getDestination()); |
| |
| ActiveMQDestination d2 = ActiveMQDestination.createDestination("WILD.FOO.TEST", destinationType); |
| connection1.request(createMessage(producerInfo1, d2, deliveryMode)); |
| m = receiveMessage(connection1); |
| assertNotNull(m); |
| assertEquals(d2, m.getDestination()); |
| |
| assertNoMessagesLeft(connection1); |
| connection1.send(closeConnectionInfo(connectionInfo1)); |
| } |
| |
| public void initCombosForTestCompositeConsume() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); |
| } |
| |
| public void testCompositeConsume() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| // setup the composite consumer. |
| ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", |
| destinationType); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination); |
| consumerInfo1.setRetroactive(true); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // Publish to the two destinations |
| ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType); |
| ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType); |
| |
| // Send a message to each destination . |
| connection1.send(createMessage(producerInfo1, destinationA, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destinationB, deliveryMode)); |
| |
| // The consumer should get both messages. |
| for (int i = 0; i < 2; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| } |
| |
| assertNoMessagesLeft(connection1); |
| connection1.send(closeConnectionInfo(connectionInfo1)); |
| } |
| |
| public void initCombosForTestCompositeSend() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); |
| } |
| |
| public void testCompositeSend() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA); |
| consumerInfo1.setRetroactive(true); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.request(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| |
| ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB); |
| consumerInfo2.setRetroactive(true); |
| consumerInfo2.setPrefetchSize(100); |
| connection2.request(consumerInfo2); |
| |
| // Send the messages to the composite destination. |
| ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", |
| destinationType); |
| for (int i = 0; i < 4; i++) { |
| connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode)); |
| } |
| |
| // The messages should have been delivered to both the A and B |
| // destination. |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| Message m2 = receiveMessage(connection2); |
| |
| assertNotNull(m1); |
| assertNotNull(m2); |
| |
| assertEquals(m1.getMessageId(), m2.getMessageId()); |
| assertEquals(compositeDestination, m1.getOriginalDestination()); |
| assertEquals(compositeDestination, m2.getOriginalDestination()); |
| |
| connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); |
| |
| } |
| |
| assertNoMessagesLeft(connection1); |
| assertNoMessagesLeft(connection2); |
| |
| connection1.send(closeConnectionInfo(connectionInfo1)); |
| connection2.send(closeConnectionInfo(connectionInfo2)); |
| } |
| |
| public void initCombosForTestConnectionCloseCascades() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"), |
| new ActiveMQQueue("TEST")}); |
| } |
| |
| public void testConnectionCloseCascades() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setNoLocal(true); |
| connection1.request(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(producerInfo2); |
| |
| // Send the messages |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| // give the async ack a chance to perculate and validate all are currently consumed |
| Message msg = receiveMessage(connection1, MAX_NULL_WAIT); |
| assertNull("all messages were received " + msg, msg); |
| |
| // Close the connection, this should in turn close the consumer. |
| connection1.request(closeConnectionInfo(connectionInfo1)); |
| |
| // Send another message, connection1 should not get the message. |
| connection2.request(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| assertNull("no message received", receiveMessage(connection1, MAX_NULL_WAIT)); |
| } |
| |
| public void initCombosForTestSessionCloseCascades() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"), |
| new ActiveMQQueue("TEST")}); |
| } |
| |
| public void testSessionCloseCascades() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setNoLocal(true); |
| connection1.request(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(producerInfo2); |
| |
| // Send the messages |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| // Close the session, this should in turn close the consumer. |
| connection1.request(closeSessionInfo(sessionInfo1)); |
| |
| // Send another message, connection1 should not get the message. |
| connection2.request(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| Message msg = receiveMessage(connection1,MAX_NULL_WAIT); |
| assertNull("no message received from connection1 after session close", msg); |
| } |
| |
| public void initCombosForTestConsumerClose() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST"), |
| new ActiveMQQueue("TEST")}); |
| } |
| |
| public void testConsumerClose() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setNoLocal(true); |
| connection1.request(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(producerInfo2); |
| |
| // Send the messages |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| connection2.send(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); |
| } |
| |
| // give the async ack a chance to perculate and validate all are currently consumed |
| // use receive rather than poll as broker info is sent async and may still need to be dequeued |
| Message result = receiveMessage(connection1, MAX_NULL_WAIT); |
| assertNull("no more messages " + result, result); |
| |
| // Close the consumer. |
| connection1.request(closeConsumerInfo(consumerInfo1)); |
| |
| // Send another message, connection1 should not get the message. |
| connection2.request(createMessage(producerInfo2, destination, deliveryMode)); |
| |
| result = receiveMessage(connection1, MAX_NULL_WAIT); |
| assertNull("no message received after close " + result, result); |
| } |
| |
| public void initCombosForTestTopicNoLocal() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testTopicNoLocal() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQTopic("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setRetroactive(true); |
| consumerInfo1.setPrefetchSize(100); |
| consumerInfo1.setNoLocal(true); |
| connection1.send(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(producerInfo2); |
| |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setRetroactive(true); |
| consumerInfo2.setPrefetchSize(100); |
| consumerInfo2.setNoLocal(true); |
| connection2.send(consumerInfo2); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| |
| // The 2nd connection should get the messages. |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection2); |
| assertNotNull(m1); |
| } |
| |
| // Send a message with the 2nd connection |
| Message message = createMessage(producerInfo2, destination, deliveryMode); |
| connection2.send(message); |
| |
| // The first connection should not see the initial 4 local messages sent |
| // but should |
| // see the messages from connection 2. |
| Message m = receiveMessage(connection1); |
| assertNotNull(m); |
| assertEquals(message.getMessageId(), m.getMessageId()); |
| |
| assertNoMessagesLeft(connection1); |
| assertNoMessagesLeft(connection2); |
| } |
| |
| public void initCombosForTopicDispatchIsBroadcast() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| } |
| |
| public void testTopicDispatchIsBroadcast() throws Exception { |
| |
| ActiveMQDestination destination = new ActiveMQTopic("TEST"); |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo1); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setRetroactive(true); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // Setup a second connection |
| StubConnection connection2 = createConnection(); |
| ConnectionInfo connectionInfo2 = createConnectionInfo(); |
| SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); |
| consumerInfo2.setRetroactive(true); |
| consumerInfo2.setPrefetchSize(100); |
| connection2.send(connectionInfo2); |
| connection2.send(sessionInfo2); |
| connection2.send(consumerInfo2); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo1, destination, deliveryMode)); |
| |
| // Get the messages |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| m1 = receiveMessage(connection2); |
| assertNotNull(m1); |
| } |
| } |
| |
| public void initCombosForTestQueueDispatchedAreRedeliveredOnConsumerClose() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); |
| } |
| |
| public void testQueueDispatchedAreRedeliveredOnConsumerClose() throws Exception { |
| |
| // Setup a first connection |
| StubConnection connection1 = createConnection(); |
| ConnectionInfo connectionInfo1 = createConnectionInfo(); |
| SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo1); |
| connection1.send(connectionInfo1); |
| connection1.send(sessionInfo1); |
| connection1.send(producerInfo); |
| |
| destination = createDestinationInfo(connection1, connectionInfo1, destinationType); |
| |
| ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo1.setPrefetchSize(100); |
| connection1.send(consumerInfo1); |
| |
| // Send the messages |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection1.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| long lastDeliveredSeq = -1; |
| // Get the messages |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| assertFalse(m1.isRedelivered()); |
| lastDeliveredSeq = m1.getMessageId().getBrokerSequenceId(); |
| } |
| // Close the consumer without sending any ACKS. |
| RemoveInfo removeInfo = closeConsumerInfo(consumerInfo1); |
| removeInfo.setLastDeliveredSequenceId(lastDeliveredSeq); |
| connection1.send(removeInfo); |
| |
| // Drain any in flight messages.. |
| while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) { |
| } |
| |
| // Add the second consumer |
| ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination); |
| consumerInfo2.setPrefetchSize(100); |
| connection1.send(consumerInfo2); |
| |
| // Make sure the messages were re delivered to the 2nd consumer. |
| for (int i = 0; i < 4; i++) { |
| Message m1 = receiveMessage(connection1); |
| assertNotNull(m1); |
| assertTrue(m1.isRedelivered()); |
| } |
| } |
| |
| public void initCombosForTestQueueBrowseMessages() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); |
| } |
| |
| public void testQueueBrowseMessages() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Use selector to skip first message. |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setBrowser(true); |
| connection.send(consumerInfo); |
| |
| for (int i = 0; i < 4; i++) { |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| } |
| |
| assertNoMessagesLeft(connection); |
| } |
| |
| public void initCombosForTestQueueSendThenAddConsumer() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); |
| } |
| |
| public void testQueueSendThenAddConsumer() throws Exception { |
| |
| // Start a producer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| // Send a message to the broker. |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Start the consumer |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| connection.send(consumerInfo); |
| |
| // Make sure the message was delivered. |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| |
| } |
| |
| public void initCombosForTestQueueAckRemovesMessage() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); |
| } |
| |
| public void testQueueAckRemovesMessage() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| Message message1 = createMessage(producerInfo, destination, deliveryMode); |
| Message message2 = createMessage(producerInfo, destination, deliveryMode); |
| connection.send(message1); |
| connection.send(message2); |
| |
| // Make sure the message was delivered. |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| connection.request(consumerInfo); |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| assertEquals(m.getMessageId(), message1.getMessageId()); |
| |
| assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); |
| connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); |
| connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); |
| assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 1); |
| |
| } |
| |
| public void initCombosForTestSelectorSkipsMessages() { |
| addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST_TOPIC"), |
| new ActiveMQQueue("TEST_QUEUE")}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testSelectorSkipsMessages() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setSelector("JMSType='last'"); |
| connection.send(consumerInfo); |
| |
| Message message1 = createMessage(producerInfo, destination, deliveryMode); |
| message1.setType("first"); |
| Message message2 = createMessage(producerInfo, destination, deliveryMode); |
| message2.setType("last"); |
| connection.send(message1); |
| connection.send(message2); |
| |
| // Use selector to skip first message. |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| assertEquals(m.getMessageId(), message2.getMessageId()); |
| connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); |
| connection.send(closeConsumerInfo(consumerInfo)); |
| |
| assertNoMessagesLeft(connection); |
| } |
| |
| public void initCombosForTestAddConsumerThenSend() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testAddConsumerThenSend() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| connection.send(consumerInfo); |
| |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Make sure the message was delivered. |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| } |
| |
| public void initCombosForTestConsumerPrefetchAtOne() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testConsumerPrefetchAtOne() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setPrefetchSize(1); |
| connection.send(consumerInfo); |
| |
| // Send 2 messages to the broker. |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Make sure only 1 message was delivered. |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| assertNoMessagesLeft(connection); |
| |
| } |
| |
| public void initCombosForTestConsumerPrefetchAtTwo() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testConsumerPrefetchAtTwo() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setPrefetchSize(2); |
| connection.send(consumerInfo); |
| |
| // Send 3 messages to the broker. |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Make sure only 1 message was delivered. |
| Message m = receiveMessage(connection); |
| assertNotNull(m); |
| m = receiveMessage(connection); |
| assertNotNull(m); |
| assertNoMessagesLeft(connection); |
| |
| } |
| |
| public void initCombosForTestConsumerPrefetchAndDeliveredAck() { |
| addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), |
| Integer.valueOf(DeliveryMode.PERSISTENT)}); |
| addCombinationValues("destinationType", |
| new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), |
| Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); |
| } |
| |
| public void testConsumerPrefetchAndDeliveredAck() throws Exception { |
| |
| // Start a producer and consumer |
| StubConnection connection = createConnection(); |
| ConnectionInfo connectionInfo = createConnectionInfo(); |
| SessionInfo sessionInfo = createSessionInfo(connectionInfo); |
| ProducerInfo producerInfo = createProducerInfo(sessionInfo); |
| connection.send(connectionInfo); |
| connection.send(sessionInfo); |
| connection.send(producerInfo); |
| |
| destination = createDestinationInfo(connection, connectionInfo, destinationType); |
| |
| ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); |
| consumerInfo.setPrefetchSize(1); |
| connection.request(consumerInfo); |
| |
| // Send 3 messages to the broker. |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.send(createMessage(producerInfo, destination, deliveryMode)); |
| connection.request(createMessage(producerInfo, destination, deliveryMode)); |
| |
| // Make sure only 1 message was delivered. |
| Message m1 = receiveMessage(connection); |
| assertNotNull(m1); |
| |
| assertNoMessagesLeft(connection); |
| |
| // Acknowledge the first message. This should cause the next message to |
| // get dispatched. |
| connection.request(createAck(consumerInfo, m1, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| |
| Message m2 = receiveMessage(connection); |
| assertNotNull(m2); |
| connection.request(createAck(consumerInfo, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| |
| Message m3 = receiveMessage(connection); |
| assertNotNull(m3); |
| connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE)); |
| } |
| |
| public void testGetServices() throws Exception { |
| assertTrue(broker.getServices().length != 0); |
| } |
| |
| public static Test suite() { |
| return suite(BrokerTest.class); |
| } |
| |
| public static void main(String[] args) { |
| junit.textui.TestRunner.run(suite()); |
| } |
| |
| } |