| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker.jmx; |
| |
| import java.io.BufferedReader; |
| import java.io.InputStreamReader; |
| import java.net.URI; |
| import java.net.URL; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.Connection; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.management.MBeanServer; |
| import javax.management.MBeanServerInvocationHandler; |
| import javax.management.MalformedObjectNameException; |
| import javax.management.ObjectName; |
| import javax.management.openmbean.CompositeData; |
| import javax.management.openmbean.TabularData; |
| |
| import junit.textui.TestRunner; |
| |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.ActiveMQSession; |
| import org.apache.activemq.BlobMessage; |
| import org.apache.activemq.EmbeddedBrokerTestSupport; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.region.BaseDestination; |
| import org.apache.activemq.broker.region.policy.PolicyEntry; |
| import org.apache.activemq.broker.region.policy.PolicyMap; |
| import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; |
| import org.apache.activemq.command.ActiveMQBlobMessage; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQTempQueue; |
| import org.apache.activemq.util.JMXSupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A test case of the various MBeans in ActiveMQ. If you want to look at the |
| * various MBeans after the test has been run then run this test case as a |
| * command line application. |
| * |
| * |
| */ |
| public class MBeanTest extends EmbeddedBrokerTestSupport { |
| private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); |
| |
| private static boolean waitForKeyPress; |
| |
| protected MBeanServer mbeanServer; |
| protected String domain = "org.apache.activemq"; |
| protected String clientID = "foo"; |
| |
| protected Connection connection; |
| protected boolean transacted; |
| protected int authMode = Session.AUTO_ACKNOWLEDGE; |
| protected static final int MESSAGE_COUNT = 2*BaseDestination.MAX_PAGE_SIZE; |
| |
| /** |
| * When you run this test case from the command line it will pause before |
| * terminating so that you can look at the MBeans state for debugging |
| * purposes. |
| */ |
| public static void main(String[] args) { |
| waitForKeyPress = true; |
| TestRunner.run(MBeanTest.class); |
| } |
| |
| public void testConnectors() throws Exception{ |
| ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); |
| BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); |
| assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); |
| |
| } |
| |
| public void testMBeans() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnection(connection); |
| |
| // test all the various MBeans now we have a producer, consumer and |
| // messages on a queue |
| assertSendViaMBean(); |
| assertQueueBrowseWorks(); |
| assertCreateAndDestroyDurableSubscriptions(); |
| assertConsumerCounts(); |
| } |
| |
| public void testMoveMessages() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnection(connection); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| CompositeData[] compdatalist = queue.browse(); |
| int initialQueueSize = compdatalist.length; |
| if (initialQueueSize == 0) { |
| fail("There is no message in the queue:"); |
| } |
| else { |
| echo("Current queue size: " + initialQueueSize); |
| } |
| int messageCount = initialQueueSize; |
| String[] messageIDs = new String[messageCount]; |
| for (int i = 0; i < messageCount; i++) { |
| CompositeData cdata = compdatalist[i]; |
| String messageID = (String) cdata.get("JMSMessageID"); |
| assertNotNull("Should have a message ID for message " + i, messageID); |
| messageIDs[i] = messageID; |
| } |
| |
| assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); |
| |
| echo("About to move " + messageCount + " messages"); |
| |
| String newDestination = getSecondDestinationString(); |
| for (String messageID : messageIDs) { |
| echo("Moving message: " + messageID); |
| queue.moveMessageTo(messageID, newDestination); |
| } |
| |
| echo("Now browsing the queue"); |
| compdatalist = queue.browse(); |
| int actualCount = compdatalist.length; |
| echo("Current queue size: " + actualCount); |
| assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); |
| |
| echo("Now browsing the second queue"); |
| |
| queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); |
| QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| long newQueuesize = queueNew.getQueueSize(); |
| echo("Second queue size: " + newQueuesize); |
| assertEquals("Unexpected number of messages ",messageCount, newQueuesize); |
| |
| // check memory usage migration |
| assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); |
| assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); |
| assertTrue("use cache", queueNew.isUseCache()); |
| assertTrue("cache enabled", queueNew.isCacheEnabled()); |
| } |
| |
| public void testRemoveMessages() throws Exception { |
| ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); |
| BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); |
| broker.addQueue(getDestinationString()); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| String msg1 = queue.sendTextMessage("message 1"); |
| String msg2 = queue.sendTextMessage("message 2"); |
| |
| assertTrue(queue.removeMessage(msg2)); |
| |
| connection = connectionFactory.createConnection(); |
| connection.start(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| ActiveMQDestination dest = createDestination(); |
| |
| MessageConsumer consumer = session.createConsumer(dest); |
| Message message = consumer.receive(1000); |
| assertNotNull(message); |
| assertEquals(msg1, message.getJMSMessageID()); |
| |
| String msg3 = queue.sendTextMessage("message 3"); |
| message = consumer.receive(1000); |
| assertNotNull(message); |
| assertEquals(msg3, message.getJMSMessageID()); |
| |
| message = consumer.receive(1000); |
| assertNull(message); |
| |
| } |
| |
| public void testRetryMessages() throws Exception { |
| // lets speed up redelivery |
| ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; |
| factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); |
| factory.getRedeliveryPolicy().setMaximumRedeliveries(1); |
| factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); |
| factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); |
| factory.getRedeliveryPolicy().setUseExponentialBackOff(false); |
| factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); |
| |
| connection = connectionFactory.createConnection(); |
| useConnection(connection); |
| |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| long initialQueueSize = queue.getQueueSize(); |
| echo("current queue size: " + initialQueueSize); |
| assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); |
| |
| // lets create a duff consumer which keeps rolling back... |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); |
| Message message = consumer.receive(5000); |
| while (message != null) { |
| echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); |
| session.rollback(); |
| message = consumer.receive(2000); |
| } |
| consumer.close(); |
| session.close(); |
| |
| |
| // now lets get the dead letter queue |
| Thread.sleep(1000); |
| |
| ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost"); |
| QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); |
| |
| long initialDlqSize = dlq.getQueueSize(); |
| CompositeData[] compdatalist = dlq.browse(); |
| int dlqQueueSize = compdatalist.length; |
| if (dlqQueueSize == 0) { |
| fail("There are no messages in the queue:"); |
| } |
| else { |
| echo("Current DLQ queue size: " + dlqQueueSize); |
| } |
| int messageCount = dlqQueueSize; |
| String[] messageIDs = new String[messageCount]; |
| for (int i = 0; i < messageCount; i++) { |
| CompositeData cdata = compdatalist[i]; |
| String messageID = (String) cdata.get("JMSMessageID"); |
| assertNotNull("Should have a message ID for message " + i, messageID); |
| messageIDs[i] = messageID; |
| } |
| |
| int dlqMemUsage = dlq.getMemoryPercentUsage(); |
| assertTrue("dlq has some memory usage", dlqMemUsage > 0); |
| assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); |
| |
| |
| echo("About to retry " + messageCount + " messages"); |
| |
| for (String messageID : messageIDs) { |
| echo("Retrying message: " + messageID); |
| dlq.retryMessage(messageID); |
| } |
| |
| long queueSize = queue.getQueueSize(); |
| compdatalist = queue.browse(); |
| int actualCount = compdatalist.length; |
| echo("Orginal queue size is now " + queueSize); |
| echo("Original browse queue size: " + actualCount); |
| |
| long dlqSize = dlq.getQueueSize(); |
| echo("DLQ size: " + dlqSize); |
| |
| assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); |
| assertEquals("queue size", initialQueueSize, queueSize); |
| assertEquals("browse queue size", initialQueueSize, actualCount); |
| |
| assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); |
| } |
| |
| public void testMoveMessagesBySelector() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnection(connection); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| String newDestination = getSecondDestinationString(); |
| queue.moveMatchingMessagesTo("counter > 2", newDestination); |
| |
| queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); |
| |
| queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| int movedSize = MESSAGE_COUNT-3; |
| assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); |
| |
| // now lets remove them by selector |
| queue.removeMatchingMessages("counter > 2"); |
| |
| assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); |
| assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); |
| } |
| |
| public void testCopyMessagesBySelector() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnection(connection); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| String newDestination = getSecondDestinationString(); |
| long queueSize = queue.getQueueSize(); |
| queue.copyMatchingMessagesTo("counter > 2", newDestination); |
| |
| |
| |
| queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); |
| |
| queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); |
| assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); |
| // now lets remove them by selector |
| queue.removeMatchingMessages("counter > 2"); |
| |
| assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); |
| assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); |
| } |
| |
| |
| protected void assertSendViaMBean() throws Exception { |
| String queueName = getDestinationString() + ".SendMBBean"; |
| |
| ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); |
| echo("Create QueueView MBean..."); |
| BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); |
| broker.addQueue(queueName); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost"); |
| |
| echo("Create QueueView MBean..."); |
| QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| proxy.purge(); |
| |
| int count = 5; |
| for (int i = 0; i < count; i++) { |
| String body = "message:" + i; |
| |
| Map headers = new HashMap(); |
| headers.put("JMSCorrelationID", "MyCorrId"); |
| headers.put("JMSDeliveryMode", Boolean.FALSE); |
| headers.put("JMSXGroupID", "MyGroupID"); |
| headers.put("JMSXGroupSeq", 1234); |
| headers.put("JMSPriority", i + 1); |
| headers.put("JMSType", "MyType"); |
| headers.put("MyHeader", i); |
| headers.put("MyStringHeader", "StringHeader" + i); |
| |
| proxy.sendTextMessage(headers, body); |
| } |
| |
| CompositeData[] compdatalist = proxy.browse(); |
| if (compdatalist.length == 0) { |
| fail("There is no message in the queue:"); |
| } |
| String[] messageIDs = new String[compdatalist.length]; |
| |
| for (int i = 0; i < compdatalist.length; i++) { |
| CompositeData cdata = compdatalist[i]; |
| |
| if (i == 0) { |
| echo("Columns: " + cdata.getCompositeType().keySet()); |
| } |
| |
| assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); |
| assertComplexData(i, cdata, "JMSPriority", i + 1); |
| assertComplexData(i, cdata, "JMSType", "MyType"); |
| assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); |
| assertComplexData(i, cdata, "JMSDeliveryMode", "NON-PERSISTENT"); |
| String expected = "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}"; |
| // The order of the properties is different when using the ibm jdk. |
| if (System.getProperty("java.vendor").equals("IBM Corporation")) { |
| expected = "{MyHeader=" + i + ", MyStringHeader=StringHeader" + i + "}"; |
| } |
| assertComplexData(i, cdata, "PropertiesText", expected); |
| |
| Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); |
| assertEquals("intProperties size()", 1, intProperties.size()); |
| assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader")); |
| |
| Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES); |
| assertEquals("stringProperties size()", 1, stringProperties.size()); |
| assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader")); |
| |
| Map properties = CompositeDataHelper.getMessageUserProperties(cdata); |
| assertEquals("properties size()", 2, properties.size()); |
| assertEquals("properties.MyHeader", i, properties.get("MyHeader")); |
| assertEquals("properties.MyHeader", "StringHeader" + i, properties.get("MyStringHeader")); |
| |
| assertComplexData(i, cdata, "JMSXGroupSeq", 1234); |
| assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID"); |
| assertComplexData(i, cdata, "Text", "message:" + i); |
| |
| } |
| } |
| |
| protected void assertComplexData(int messageIndex, CompositeData cdata, String name, Object expected) { |
| Object value = cdata.get(name); |
| assertEquals("Message " + messageIndex + " CData field: " + name, expected, value); |
| } |
| |
| |
| protected void assertQueueBrowseWorks() throws Exception { |
| Integer mbeancnt = mbeanServer.getMBeanCount(); |
| echo("Mbean count :" + mbeancnt); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| echo("Create QueueView MBean..."); |
| QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| long concount = proxy.getConsumerCount(); |
| echo("Consumer Count :" + concount); |
| long messcount = proxy.getQueueSize(); |
| echo("current number of messages in the queue :" + messcount); |
| |
| // lets browse |
| CompositeData[] compdatalist = proxy.browse(); |
| if (compdatalist.length == 0) { |
| fail("There is no message in the queue:"); |
| } |
| String[] messageIDs = new String[compdatalist.length]; |
| |
| for (int i = 0; i < compdatalist.length; i++) { |
| CompositeData cdata = compdatalist[i]; |
| |
| if (i == 0) { |
| echo("Columns: " + cdata.getCompositeType().keySet()); |
| } |
| messageIDs[i] = (String)cdata.get("JMSMessageID"); |
| echo("message " + i + " : " + cdata.values()); |
| } |
| |
| TabularData table = proxy.browseAsTable(); |
| echo("Found tabular data: " + table); |
| assertTrue("Table should not be empty!", table.size() > 0); |
| |
| assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); |
| |
| String messageID = messageIDs[0]; |
| String newDestinationName = "queue://dummy.test.cheese"; |
| echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName); |
| proxy.copyMessageTo(messageID, newDestinationName); |
| |
| assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); |
| |
| messageID = messageIDs[1]; |
| echo("Attempting to remove: " + messageID); |
| proxy.removeMessage(messageID); |
| |
| assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize()); |
| |
| echo("Worked!"); |
| } |
| |
| protected void assertCreateAndDestroyDurableSubscriptions() throws Exception { |
| // lets create a new topic |
| ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); |
| echo("Create QueueView MBean..."); |
| BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); |
| |
| broker.addTopic(getDestinationString()); |
| |
| assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); |
| |
| String topicName = getDestinationString(); |
| String selector = null; |
| ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector); |
| broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector); |
| assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length); |
| |
| assertNotNull("Should have created an mbean name for the durable subscriber!", name1); |
| |
| LOG.info("Created durable subscriber with name: " + name1); |
| |
| // now lets try destroy it |
| broker.destroyDurableSubscriber(clientID, "subscriber1"); |
| assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length); |
| } |
| |
| protected void assertConsumerCounts() throws Exception { |
| ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); |
| BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); |
| |
| assertTrue("broker is not a slave", !broker.isSlave()); |
| // create 2 topics |
| broker.addTopic(getDestinationString() + "1"); |
| broker.addTopic(getDestinationString() + "2"); |
| |
| ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "1"); |
| ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination=" + getDestinationString() + "2"); |
| TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); |
| TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); |
| |
| assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); |
| |
| String topicName = getDestinationString(); |
| String selector = null; |
| |
| // create 1 subscriber for each topic |
| broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector); |
| broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector); |
| |
| assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); |
| |
| // create 1 more subscriber for topic1 |
| broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector); |
| |
| assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); |
| |
| // destroy topic1 subscriber |
| broker.destroyDurableSubscriber(clientID, "topic1.subscriber1"); |
| |
| assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); |
| |
| // destroy topic2 subscriber |
| broker.destroyDurableSubscriber(clientID, "topic2.subscriber1"); |
| |
| assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); |
| |
| // destroy remaining topic1 subscriber |
| broker.destroyDurableSubscriber(clientID, "topic1.subscriber2"); |
| |
| assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); |
| assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); |
| } |
| |
| protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { |
| ObjectName objectName = new ObjectName(name); |
| if (mbeanServer.isRegistered(objectName)) { |
| echo("Bean Registered: " + objectName); |
| } else { |
| fail("Could not find MBean!: " + objectName); |
| } |
| return objectName; |
| } |
| |
| protected void setUp() throws Exception { |
| bindAddress = "tcp://localhost:61616"; |
| useTopic = false; |
| super.setUp(); |
| mbeanServer = broker.getManagementContext().getMBeanServer(); |
| } |
| |
| protected void tearDown() throws Exception { |
| if (waitForKeyPress) { |
| // We are running from the command line so let folks browse the |
| // mbeans... |
| System.out.println(); |
| System.out.println("Press enter to terminate the program."); |
| System.out.println("In the meantime you can use your JMX console to view the current MBeans"); |
| BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); |
| reader.readLine(); |
| } |
| |
| if (connection != null) { |
| connection.close(); |
| connection = null; |
| } |
| super.tearDown(); |
| } |
| |
| protected BrokerService createBroker() throws Exception { |
| BrokerService answer = new BrokerService(); |
| answer.setPersistent(false); |
| answer.setDeleteAllMessagesOnStartup(true); |
| answer.setUseJmx(true); |
| |
| // apply memory limit so that %usage is visible |
| PolicyMap policyMap = new PolicyMap(); |
| PolicyEntry defaultEntry = new PolicyEntry(); |
| defaultEntry.setMemoryLimit(1024*1024*4); |
| policyMap.setDefaultEntry(defaultEntry); |
| answer.setDestinationPolicy(policyMap); |
| |
| answer.addConnector(bindAddress); |
| return answer; |
| } |
| |
| protected void useConnection(Connection connection) throws Exception { |
| connection.setClientID(clientID); |
| connection.start(); |
| Session session = connection.createSession(transacted, authMode); |
| destination = createDestination(); |
| MessageProducer producer = session.createProducer(destination); |
| for (int i = 0; i < MESSAGE_COUNT; i++) { |
| Message message = session.createTextMessage("Message: " + i); |
| message.setIntProperty("counter", i); |
| message.setJMSCorrelationID("MyCorrelationID"); |
| message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); |
| message.setJMSType("MyType"); |
| message.setJMSPriority(5); |
| producer.send(message); |
| } |
| Thread.sleep(1000); |
| } |
| |
| |
| protected void useConnectionWithBlobMessage(Connection connection) throws Exception { |
| connection.setClientID(clientID); |
| connection.start(); |
| ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); |
| destination = createDestination(); |
| MessageProducer producer = session.createProducer(destination); |
| for (int i = 0; i < MESSAGE_COUNT; i++) { |
| BlobMessage message = session.createBlobMessage(new URL("http://foo.bar/test")); |
| message.setIntProperty("counter", i); |
| message.setJMSCorrelationID("MyCorrelationID"); |
| message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); |
| message.setJMSType("MyType"); |
| message.setJMSPriority(5); |
| producer.send(message); |
| } |
| Thread.sleep(1000); |
| } |
| |
| protected void useConnectionWithByteMessage(Connection connection) throws Exception { |
| connection.setClientID(clientID); |
| connection.start(); |
| ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); |
| destination = createDestination(); |
| MessageProducer producer = session.createProducer(destination); |
| for (int i = 0; i < MESSAGE_COUNT; i++) { |
| BytesMessage message = session.createBytesMessage(); |
| message.writeBytes(("Message: " + i).getBytes()); |
| message.setIntProperty("counter", i); |
| message.setJMSCorrelationID("MyCorrelationID"); |
| message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); |
| message.setJMSType("MyType"); |
| message.setJMSPriority(5); |
| producer.send(message); |
| } |
| Thread.sleep(1000); |
| } |
| |
| protected void echo(String text) { |
| LOG.info(text); |
| } |
| |
| |
| protected String getSecondDestinationString() { |
| return "test.new.destination." + getClass() + "." + getName(); |
| } |
| |
| |
| public void testTempQueueJMXDelete() throws Exception { |
| connection = connectionFactory.createConnection(); |
| |
| connection.setClientID(clientID); |
| connection.start(); |
| Session session = connection.createSession(transacted, authMode); |
| ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue(); |
| Thread.sleep(1000); |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost"); |
| |
| // should not throw an exception |
| mbeanServer.getObjectInstance(queueViewMBeanName); |
| |
| tQueue.delete(); |
| Thread.sleep(1000); |
| try { |
| // should throw an exception |
| mbeanServer.getObjectInstance(queueViewMBeanName); |
| |
| fail("should be deleted already!"); |
| } catch (Exception e) { |
| // expected! |
| } |
| |
| } |
| |
| // Test for AMQ-3029 |
| public void testBrowseBlobMessages() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnectionWithBlobMessage(connection); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| CompositeData[] compdatalist = queue.browse(); |
| int initialQueueSize = compdatalist.length; |
| if (initialQueueSize == 0) { |
| fail("There is no message in the queue:"); |
| } |
| else { |
| echo("Current queue size: " + initialQueueSize); |
| } |
| int messageCount = initialQueueSize; |
| String[] messageIDs = new String[messageCount]; |
| for (int i = 0; i < messageCount; i++) { |
| CompositeData cdata = compdatalist[i]; |
| String messageID = (String) cdata.get("JMSMessageID"); |
| assertNotNull("Should have a message ID for message " + i, messageID); |
| |
| messageIDs[i] = messageID; |
| } |
| |
| assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); |
| } |
| |
| public void testBrowseBytesMessages() throws Exception { |
| connection = connectionFactory.createConnection(); |
| useConnectionWithByteMessage(connection); |
| |
| ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); |
| |
| QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); |
| |
| CompositeData[] compdatalist = queue.browse(); |
| int initialQueueSize = compdatalist.length; |
| if (initialQueueSize == 0) { |
| fail("There is no message in the queue:"); |
| } |
| else { |
| echo("Current queue size: " + initialQueueSize); |
| } |
| int messageCount = initialQueueSize; |
| String[] messageIDs = new String[messageCount]; |
| for (int i = 0; i < messageCount; i++) { |
| CompositeData cdata = compdatalist[i]; |
| String messageID = (String) cdata.get("JMSMessageID"); |
| assertNotNull("Should have a message ID for message " + i, messageID); |
| messageIDs[i] = messageID; |
| |
| Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW); |
| assertNotNull("should be a preview", preview); |
| assertTrue("not empty", preview.length > 0); |
| } |
| |
| assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); |
| |
| // consume all the messages |
| echo("Attempting to consume all bytes messages from: " + destination); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumer = session.createConsumer(destination); |
| for (int i=0; i<MESSAGE_COUNT; i++) { |
| Message message = consumer.receive(5000); |
| assertNotNull(message); |
| assertTrue(message instanceof BytesMessage); |
| } |
| consumer.close(); |
| session.close(); |
| } |
| } |