| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.systest.management.jmx; |
| |
| import org.apache.commons.lang.time.FastDateFormat; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.management.common.mbeans.ManagedBroker; |
| import org.apache.qpid.management.common.mbeans.ManagedQueue; |
| import org.apache.qpid.server.queue.AMQQueueFactory; |
| import org.apache.qpid.server.queue.NotificationCheckTest; |
| import org.apache.qpid.server.queue.SimpleAMQQueueTest; |
| import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; |
| import org.apache.qpid.test.utils.JMXTestUtils; |
| import org.apache.qpid.test.utils.QpidBrokerTestCase; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import javax.management.Notification; |
| import javax.management.NotificationListener; |
| import javax.management.openmbean.CompositeData; |
| import javax.management.openmbean.TabularData; |
| import javax.naming.NamingException; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * Tests the JMX API for the Managed Queue. |
| * |
| */ |
| public class QueueManagementTest extends QpidBrokerTestCase |
| { |
| |
| private static final Logger LOGGER = Logger.getLogger(QueueManagementTest.class); |
| |
| private static final String VIRTUAL_HOST = "test"; |
| private static final String TEST_QUEUE_DESCRIPTION = "my description"; |
| |
| private JMXTestUtils _jmxUtils; |
| private Connection _connection; |
| private Session _session; |
| |
| private String _sourceQueueName; |
| private String _destinationQueueName; |
| private Destination _sourceQueue; |
| private Destination _destinationQueue; |
| private ManagedQueue _managedSourceQueue; |
| private ManagedQueue _managedDestinationQueue; |
| |
| public void setUp() throws Exception |
| { |
| _jmxUtils = new JMXTestUtils(this); |
| _jmxUtils.setUp(); |
| |
| super.setUp(); |
| _sourceQueueName = getTestQueueName() + "_src"; |
| _destinationQueueName = getTestQueueName() + "_dest"; |
| |
| createConnectionAndSession(); |
| |
| _sourceQueue = _session.createQueue(_sourceQueueName); |
| _destinationQueue = _session.createQueue(_destinationQueueName); |
| createQueueOnBroker(_sourceQueue); |
| createQueueOnBroker(_destinationQueue); |
| |
| _jmxUtils.open(); |
| |
| createManagementInterfacesForQueues(); |
| } |
| |
| public void tearDown() throws Exception |
| { |
| if (_jmxUtils != null) |
| { |
| _jmxUtils.close(); |
| } |
| super.tearDown(); |
| } |
| |
| public void testQueueAttributes() throws Exception |
| { |
| Queue queue = _session.createQueue(getTestQueueName()); |
| createQueueOnBroker(queue); |
| |
| final String queueName = queue.getQueueName(); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals("Unexpected name", queueName, managedQueue.getName()); |
| assertEquals("Unexpected queue type", "standard", managedQueue.getQueueType()); |
| } |
| |
| public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception |
| { |
| Queue tmpQueue = _session.createTemporaryQueue(); |
| |
| final String queueName = tmpQueue.getQueueName(); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertNotNull(_connection.getClientID()); |
| assertEquals("Unexpected owner", _connection.getClientID(), managedQueue.getOwner()); |
| } |
| |
| public void testNonExclusiveQueueHasNoOwner() throws Exception |
| { |
| Queue nonExclusiveQueue = _session.createQueue(getTestQueueName()); |
| createQueueOnBroker(nonExclusiveQueue); |
| |
| final String queueName = nonExclusiveQueue.getQueueName(); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertNull("Unexpected owner", managedQueue.getOwner()); |
| } |
| |
| public void testSetNewQueueDescriptionOnExistingQueue() throws Exception |
| { |
| Queue queue = _session.createQueue(getTestQueueName()); |
| createQueueOnBroker(queue); |
| |
| final String queueName = queue.getQueueName(); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertNull("Unexpected description", managedQueue.getDescription()); |
| |
| managedQueue.setDescription(TEST_QUEUE_DESCRIPTION); |
| assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); |
| } |
| |
| public void testNewQueueWithDescription() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); |
| ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); |
| } |
| |
| /** |
| * Requires persistent store. |
| */ |
| public void testQueueDescriptionSurvivesRestart() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); |
| |
| ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); |
| |
| ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); |
| |
| restartBroker(); |
| |
| managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); |
| } |
| |
| /** |
| * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests |
| * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. |
| */ |
| public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception |
| { |
| final String queueName = getName(); |
| final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); |
| |
| final Integer deliveryCount = 1; |
| final Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); |
| managedBroker.createNewQueue(queueName, null, true, arguments); |
| |
| // Ensure the queue exists |
| assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); |
| assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); |
| } |
| |
| /** |
| * Requires 0-10 as relies on ADDR addresses. |
| * @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange |
| */ |
| public void testGetSetAlternateExchange() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| String altExchange = "amq.fanout"; |
| String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); |
| Queue queue = _session.createQueue(addrWithAltExch); |
| |
| createQueueOnBroker(queue); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); |
| |
| String newAltExch = "amq.topic"; |
| managedQueue.setAlternateExchange(newAltExch); |
| assertEquals("Unexpected alternate exchange after set", newAltExch, managedQueue.getAlternateExchange()); |
| } |
| |
| /** |
| * Requires 0-10 as relies on ADDR addresses. |
| */ |
| public void testRemoveAlternateExchange() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| String altExchange = "amq.fanout"; |
| String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); |
| Queue queue = _session.createQueue(addrWithAltExch); |
| |
| createQueueOnBroker(queue); |
| |
| final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); |
| |
| managedQueue.setAlternateExchange(""); |
| assertNull("Unexpected alternate exchange after set", managedQueue.getAlternateExchange()); |
| } |
| |
| /** |
| * Requires persistent store |
| * Requires 0-10 as relies on ADDR addresses. |
| */ |
| public void testAlternateExchangeSurvivesRestart() throws Exception |
| { |
| String nonMandatoryExchangeName = "exch" + getName(); |
| |
| final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); |
| managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true); |
| |
| String queueName1 = getTestQueueName() + "1"; |
| String altExchange1 = "amq.fanout"; |
| String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1); |
| Queue queue1 = _session.createQueue(addr1WithAltExch); |
| |
| String queueName2 = getTestQueueName() + "2"; |
| String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2); |
| Queue queue2 = _session.createQueue(addr2WithoutAltExch); |
| |
| createQueueOnBroker(queue1); |
| createQueueOnBroker(queue2); |
| |
| ManagedQueue managedQueue1 = _jmxUtils.getManagedQueue(queueName1); |
| assertEquals("Newly created queue1 does not have expected alternate exchange", altExchange1, managedQueue1.getAlternateExchange()); |
| |
| ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2); |
| assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange()); |
| |
| String altExchange2 = nonMandatoryExchangeName; |
| managedQueue2.setAlternateExchange(altExchange2); |
| |
| restartBroker(); |
| |
| managedQueue1 = _jmxUtils.getManagedQueue(queueName1); |
| assertEquals("Queue1 does not have expected alternate exchange after restart", altExchange1, managedQueue1.getAlternateExchange()); |
| |
| managedQueue2 = _jmxUtils.getManagedQueue(queueName2); |
| assertEquals("Queue2 does not have expected updated alternate exchange after restart", altExchange2, managedQueue2.getAlternateExchange()); |
| } |
| |
| /** |
| * Tests the ability to receive queue alerts as JMX notifications. |
| * |
| * @see NotificationCheckTest |
| * @see SimpleAMQQueueTest#testNotificationFiredAsync() |
| * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue() |
| */ |
| public void testQueueNotification() throws Exception |
| { |
| final String queueName = getName(); |
| final long maximumMessageCount = 3; |
| |
| Queue queue = _session.createQueue(queueName); |
| createQueueOnBroker(queue); |
| |
| ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |
| managedQueue.setMaximumMessageCount(maximumMessageCount); |
| |
| RecordingNotificationListener listener = new RecordingNotificationListener(1); |
| |
| _jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName), listener, null, null); |
| |
| // Send two messages - this should *not* trigger the notification |
| sendMessage(_session, queue, 2); |
| |
| assertEquals("Premature notification received", 0, listener.getNumberOfNotificationsReceived()); |
| |
| // A further message should trigger the message count alert |
| sendMessage(_session, queue, 1); |
| |
| listener.awaitExpectedNotifications(5, TimeUnit.SECONDS); |
| |
| assertEquals("Unexpected number of JMX notifications received", 1, listener.getNumberOfNotificationsReceived()); |
| |
| Notification notification = listener.getLastNotification(); |
| assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT 3: Maximum count on queue threshold (3) breached.", notification.getMessage()); |
| } |
| |
| /** |
| * Tests {@link ManagedQueue#viewMessages(long, long)} interface. |
| */ |
| public void testViewSingleMessage() throws Exception |
| { |
| final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); |
| syncSession(_session); |
| final Message sentMessage = sentMessages.get(0); |
| |
| assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); |
| |
| // Check the contents of the message |
| final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); |
| assertEquals("Unexpected number of rows in table", 1, tab.size()); |
| final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); |
| |
| final CompositeData row1 = rowItr.next(); |
| assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); |
| assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); |
| assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); |
| |
| // Check the contents of header (encoded in a string array) |
| final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); |
| assertNotNull("Expected message header array", headerArray); |
| final Map<String, String> headers = headerArrayToMap(headerArray); |
| |
| final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); |
| final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(ManagedQueue.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); |
| assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); |
| assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); |
| assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); |
| } |
| |
| /** |
| * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. |
| */ |
| public void testMoveMessagesBetweenQueues() throws Exception |
| { |
| final int numberOfMessagesToSend = 10; |
| |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| |
| // Move first three messages to destination |
| long fromMessageId = amqMessagesIds.get(0); |
| long toMessageId = amqMessagesIds.get(2); |
| _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); |
| |
| assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); |
| |
| // Now move a further two messages to destination |
| fromMessageId = amqMessagesIds.get(7); |
| toMessageId = amqMessagesIds.get(8); |
| _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); |
| assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); |
| |
| assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); |
| } |
| |
| /** |
| * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. |
| */ |
| public void testCopyMessagesBetweenQueues() throws Exception |
| { |
| final int numberOfMessagesToSend = 10; |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| |
| // Copy first three messages to destination |
| long fromMessageId = amqMessagesIds.get(0); |
| long toMessageId = amqMessagesIds.get(2); |
| _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); |
| |
| assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| // Now copy a further two messages to destination |
| fromMessageId = amqMessagesIds.get(7); |
| toMessageId = amqMessagesIds.get(8); |
| _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); |
| assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); |
| } |
| |
| public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception |
| { |
| setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); |
| Connection asyncConnection = getConnection(); |
| asyncConnection.start(); |
| |
| final int numberOfMessagesToSend = 50; |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| |
| long fromMessageId = amqMessagesIds.get(0); |
| long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); |
| |
| CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); |
| AtomicInteger totalConsumed = new AtomicInteger(0); |
| startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); |
| |
| boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); |
| assertTrue("Did not read half of messages within time allowed", halfwayPointReached); |
| |
| _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); |
| |
| asyncConnection.stop(); |
| |
| // The exact number of messages moved will be non deterministic, as the number of messages processed |
| // by the consumer cannot be predicted. There is also the possibility that a message can remain |
| // on the source queue. This situation will arise if a message has been acquired by the consumer, but not |
| // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. |
| // |
| // The number of messages moved + the number consumed + any messages remaining on source should |
| // *always* be equal to the number we originally sent. |
| |
| int numberOfMessagesReadByConsumer = totalConsumed.intValue(); |
| int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); |
| int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); |
| |
| LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer |
| + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue |
| + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); |
| assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); |
| } |
| |
| public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception |
| { |
| setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); |
| Connection asyncConnection = getConnection(); |
| asyncConnection.start(); |
| |
| final int numberOfMessagesToSend = 50; |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| long fromMessageId = amqMessagesIds.get(0); |
| long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); |
| |
| AtomicInteger totalConsumed = new AtomicInteger(0); |
| CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); |
| startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); |
| |
| _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); |
| |
| allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); |
| assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); |
| } |
| |
| /** |
| * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. |
| */ |
| public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception |
| { |
| final int numberOfMessagesToSend = 1; |
| |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| restartBroker(); |
| |
| createManagementInterfacesForQueues(); |
| createConnectionAndSession(); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| |
| // Move messages to destination |
| long messageId = amqMessagesIds.get(0); |
| _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName); |
| |
| assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue()); |
| |
| assertMessageIndicesOn(_destinationQueue, 0); |
| } |
| |
| /** |
| * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. |
| */ |
| public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception |
| { |
| final int numberOfMessagesToSend = 1; |
| |
| sendMessage(_session, _sourceQueue, numberOfMessagesToSend); |
| syncSession(_session); |
| assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); |
| |
| restartBroker(); |
| |
| createManagementInterfacesForQueues(); |
| createConnectionAndSession(); |
| |
| List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); |
| |
| // Move messages to destination |
| long messageId = amqMessagesIds.get(0); |
| _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName); |
| |
| assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue()); |
| assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue()); |
| |
| assertMessageIndicesOn(_destinationQueue, 0); |
| } |
| |
| @Override |
| public Message createNextMessage(Session session, int messageNumber) throws JMSException |
| { |
| Message message = session.createTextMessage(getContentForMessageNumber(messageNumber)); |
| message.setIntProperty(INDEX, messageNumber); |
| return message; |
| } |
| |
| private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, |
| final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception |
| { |
| Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumer = session.createConsumer(queue); |
| consumer.setMessageListener(new MessageListener() |
| { |
| |
| @Override |
| public void onMessage(Message arg0) |
| { |
| totalConsumed.incrementAndGet(); |
| requiredNumberOfMessagesRead.countDown(); |
| } |
| }); |
| } |
| |
| private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception |
| { |
| MessageConsumer consumer = _session.createConsumer(queue); |
| |
| for (int i : expectedIndices) |
| { |
| TextMessage message = (TextMessage)consumer.receive(1000); |
| assertNotNull("Expected message with index " + i, message); |
| assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); |
| assertEquals("Expected message content", getContentForMessageNumber(i), message.getText()); |
| } |
| |
| assertNull("Unexpected message encountered", consumer.receive(1000)); |
| } |
| |
| private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception |
| { |
| final SortedSet<Long> messageIds = new TreeSet<Long>(); |
| |
| final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); |
| final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); |
| while(rowItr.hasNext()) |
| { |
| final CompositeData row = rowItr.next(); |
| long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); |
| messageIds.add(amqMessageId); |
| } |
| |
| return new ArrayList<Long>(messageIds); |
| } |
| |
| /** |
| * |
| * Utility method to convert array of Strings in the form x = y into a |
| * map with key/value x => y. |
| * |
| */ |
| private Map<String,String> headerArrayToMap(final String[] headerArray) |
| { |
| final Map<String, String> headerMap = new HashMap<String, String>(); |
| final List<String> headerList = Arrays.asList(headerArray); |
| for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) |
| { |
| final String nameValuePair = iterator.next(); |
| final String[] nameValue = nameValuePair.split(" *= *", 2); |
| headerMap.put(nameValue[0], nameValue[1]); |
| } |
| return headerMap; |
| } |
| |
| private void createQueueOnBroker(Destination destination) throws JMSException |
| { |
| _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation |
| } |
| |
| private void syncSession(Session session) throws Exception |
| { |
| ((AMQSession<?,?>)session).sync(); |
| } |
| |
| private void createConnectionAndSession() throws JMSException, |
| NamingException |
| { |
| _connection = getConnection(); |
| _connection.start(); |
| _session = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| } |
| |
| private void createManagementInterfacesForQueues() |
| { |
| _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); |
| _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); |
| } |
| |
| private String getContentForMessageNumber(int msgCount) |
| { |
| return "Message count " + msgCount; |
| } |
| |
| private final class RecordingNotificationListener implements NotificationListener |
| { |
| private final CountDownLatch _notificationReceivedLatch; |
| private final AtomicInteger _numberOfNotifications; |
| private final AtomicReference<Notification> _lastNotification; |
| |
| private RecordingNotificationListener(int expectedNumberOfNotifications) |
| { |
| _notificationReceivedLatch = new CountDownLatch(expectedNumberOfNotifications); |
| _numberOfNotifications = new AtomicInteger(0); |
| _lastNotification = new AtomicReference<Notification>(); |
| } |
| |
| @Override |
| public void handleNotification(Notification notification, Object handback) |
| { |
| _lastNotification.set(notification); |
| _numberOfNotifications.incrementAndGet(); |
| _notificationReceivedLatch.countDown(); |
| } |
| |
| public int getNumberOfNotificationsReceived() |
| { |
| return _numberOfNotifications.get(); |
| } |
| |
| public Notification getLastNotification() |
| { |
| return _lastNotification.get(); |
| } |
| |
| public void awaitExpectedNotifications(long timeout, TimeUnit timeunit) throws InterruptedException |
| { |
| _notificationReceivedLatch.await(timeout, timeunit); |
| } |
| } |
| |
| } |