| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.bugs; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Vector; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import javax.jms.BytesMessage; |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.DeliveryMode; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.jms.Topic; |
| import javax.jms.TopicConnection; |
| import javax.jms.TopicConnectionFactory; |
| import javax.jms.TopicPublisher; |
| import javax.jms.TopicSession; |
| import javax.jms.TopicSubscriber; |
| import javax.management.ObjectName; |
| import junit.framework.Test; |
| import org.apache.activemq.ActiveMQConnection; |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.CombinationTestSupport; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.jmx.BrokerView; |
| import org.apache.activemq.broker.region.policy.PolicyEntry; |
| import org.apache.activemq.broker.region.policy.PolicyMap; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; |
| import org.apache.activemq.store.kahadb.KahaDBStore; |
| import org.apache.activemq.util.IOHelper; |
| import org.apache.activemq.util.Wait; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A Test case for AMQ-1479 |
| */ |
| public class DurableConsumerTest extends CombinationTestSupport{ |
| private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class); |
| private static int COUNT = 1024; |
| private static String CONSUMER_NAME = "DURABLE_TEST"; |
| protected BrokerService broker; |
| |
| protected String bindAddress = "tcp://localhost:61616"; |
| |
| protected byte[] payload = new byte[1024 * 32]; |
| protected ConnectionFactory factory; |
| protected Vector<Exception> exceptions = new Vector<Exception>(); |
| |
| private static final String TOPIC_NAME = "failoverTopic"; |
| private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; |
| public boolean useDedicatedTaskRunner = false; |
| |
| private class SimpleTopicSubscriber implements MessageListener,ExceptionListener{ |
| |
| private TopicConnection topicConnection = null; |
| |
| public SimpleTopicSubscriber(String connectionURL,String clientId,String topicName) { |
| |
| ActiveMQConnectionFactory topicConnectionFactory = null; |
| TopicSession topicSession = null; |
| Topic topic = null; |
| TopicSubscriber topicSubscriber = null; |
| |
| topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL); |
| try { |
| |
| topic = new ActiveMQTopic(topicName); |
| topicConnection = topicConnectionFactory.createTopicConnection(); |
| topicConnection.setClientID((clientId)); |
| topicConnection.start(); |
| |
| topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); |
| topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId)); |
| topicSubscriber.setMessageListener(this); |
| |
| } catch (JMSException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| public void onMessage(Message arg0){ |
| } |
| |
| public void closeConnection(){ |
| if (topicConnection != null) { |
| try { |
| topicConnection.close(); |
| } catch (JMSException e) { |
| } |
| } |
| } |
| |
| public void onException(JMSException exception){ |
| exceptions.add(exception); |
| } |
| } |
| |
| private class MessagePublisher implements Runnable{ |
| private final boolean shouldPublish = true; |
| |
| public void run(){ |
| TopicConnectionFactory topicConnectionFactory = null; |
| TopicConnection topicConnection = null; |
| TopicSession topicSession = null; |
| Topic topic = null; |
| TopicPublisher topicPublisher = null; |
| Message message = null; |
| |
| topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); |
| try { |
| topic = new ActiveMQTopic(TOPIC_NAME); |
| topicConnection = topicConnectionFactory.createTopicConnection(); |
| topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); |
| topicPublisher = topicSession.createPublisher(topic); |
| message = topicSession.createMessage(); |
| } catch (Exception ex) { |
| exceptions.add(ex); |
| } |
| while (shouldPublish) { |
| try { |
| topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000); |
| } catch (JMSException ex) { |
| exceptions.add(ex); |
| } |
| try { |
| Thread.sleep(1); |
| } catch (Exception ex) { |
| } |
| } |
| } |
| } |
| |
| private void configurePersistence(BrokerService broker) throws Exception{ |
| File dataDirFile = new File("target/" + getName()); |
| KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter(); |
| kahaDBAdapter.setDirectory(dataDirFile); |
| broker.setPersistenceAdapter(kahaDBAdapter); |
| } |
| |
| public void testFailover() throws Exception{ |
| |
| configurePersistence(broker); |
| broker.start(); |
| |
| Thread publisherThread = new Thread(new MessagePublisher()); |
| publisherThread.start(); |
| final int numSubs = 100; |
| final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>(numSubs); |
| for (int i = 0; i < numSubs; i++) { |
| |
| final int id = i; |
| Thread thread = new Thread(new Runnable(){ |
| public void run(){ |
| SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); |
| list.add(s); |
| } |
| }); |
| thread.start(); |
| |
| } |
| |
| Wait.waitFor(new Wait.Condition(){ |
| @Override |
| public boolean isSatisified() throws Exception { |
| return numSubs == list.size(); |
| } |
| }); |
| |
| broker.stop(); |
| broker = createBroker(false); |
| configurePersistence(broker); |
| broker.start(); |
| Thread.sleep(10000); |
| for (SimpleTopicSubscriber s:list) { |
| s.closeConnection(); |
| } |
| assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); |
| } |
| |
| // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 |
| // with use dedicatedTaskRunner=true and produce OOM |
| public void initCombosForTestConcurrentDurableConsumer(){ |
| addCombinationValues("useDedicatedTaskRunner", new Object[] { Boolean.TRUE, Boolean.FALSE }); |
| } |
| |
| public void testConcurrentDurableConsumer() throws Exception{ |
| |
| broker.start(); |
| |
| factory = createConnectionFactory(); |
| final String topicName = getName(); |
| final int numMessages = 500; |
| int numConsumers = 1; |
| final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers); |
| final AtomicInteger receivedCount = new AtomicInteger(); |
| Runnable consumer = new Runnable(){ |
| public void run(){ |
| final String consumerName = Thread.currentThread().getName(); |
| int acked = 0; |
| int received = 0; |
| |
| try { |
| while (acked < numMessages / 2) { |
| // take one message and close, ack on occasion |
| Connection consumerConnection = factory.createConnection(); |
| ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false); |
| consumerConnection.setClientID(consumerName); |
| Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(topicName); |
| consumerConnection.start(); |
| |
| MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName); |
| |
| counsumerStarted.countDown(); |
| Message msg = null; |
| do { |
| msg = consumer.receive(5000); |
| if (msg != null) { |
| receivedCount.incrementAndGet(); |
| if (received != 0 && received % 100 == 0) { |
| LOG.info("Received msg: " + msg.getJMSMessageID()); |
| } |
| if (++received % 2 == 0) { |
| msg.acknowledge(); |
| acked++; |
| } |
| } |
| } while (msg == null); |
| |
| consumerConnection.close(); |
| } |
| assertTrue(received >= acked); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| exceptions.add(e); |
| } |
| } |
| }; |
| |
| ExecutorService executor = Executors.newFixedThreadPool(numConsumers); |
| |
| for (int i = 0; i < numConsumers; i++) { |
| executor.execute(consumer); |
| } |
| |
| assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS)); |
| |
| Connection producerConnection = factory.createConnection(); |
| ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false); |
| Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = producerSession.createTopic(topicName); |
| MessageProducer producer = producerSession.createProducer(topic); |
| producerConnection.start(); |
| for (int i = 0; i < numMessages; i++) { |
| BytesMessage msg = producerSession.createBytesMessage(); |
| msg.writeBytes(payload); |
| producer.send(msg); |
| if (i != 0 && i % 100 == 0) { |
| LOG.info("Sent msg " + i); |
| } |
| } |
| |
| executor.shutdown(); |
| executor.awaitTermination(30, TimeUnit.SECONDS); |
| |
| Wait.waitFor(new Wait.Condition(){ |
| public boolean isSatisified() throws Exception{ |
| LOG.info("receivedCount: " + receivedCount.get()); |
| return receivedCount.get() == numMessages; |
| } |
| }, 360 * 1000); |
| assertEquals("got required some messages", numMessages, receivedCount.get()); |
| assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); |
| } |
| |
| public void testConsumerRecover() throws Exception{ |
| doTestConsumer(true); |
| } |
| |
| public void testConsumer() throws Exception{ |
| doTestConsumer(false); |
| } |
| |
| public void testPrefetchViaBrokerConfig() throws Exception { |
| |
| Integer prefetchVal = new Integer(150); |
| PolicyEntry policyEntry = new PolicyEntry(); |
| policyEntry.setDurableTopicPrefetch(prefetchVal.intValue()); |
| policyEntry.setPrioritizedMessages(true); |
| PolicyMap policyMap = new PolicyMap(); |
| policyMap.setDefaultEntry(policyEntry); |
| broker.setDestinationPolicy(policyMap); |
| broker.start(); |
| |
| factory = createConnectionFactory(); |
| Connection consumerConnection = factory.createConnection(); |
| consumerConnection.setClientID(CONSUMER_NAME); |
| Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(getClass().getName()); |
| MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); |
| consumerConnection.start(); |
| |
| ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0]; |
| Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize"); |
| assertEquals(prefetchVal, prefetchFromSubView); |
| } |
| |
| public void doTestConsumer(boolean forceRecover) throws Exception{ |
| |
| if (forceRecover) { |
| configurePersistence(broker); |
| } |
| broker.start(); |
| |
| factory = createConnectionFactory(); |
| Connection consumerConnection = factory.createConnection(); |
| consumerConnection.setClientID(CONSUMER_NAME); |
| Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Topic topic = consumerSession.createTopic(getClass().getName()); |
| MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); |
| consumerConnection.start(); |
| consumerConnection.close(); |
| broker.stop(); |
| broker = createBroker(false); |
| if (forceRecover) { |
| configurePersistence(broker); |
| } |
| broker.start(); |
| |
| Connection producerConnection = factory.createConnection(); |
| |
| Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| MessageProducer producer = producerSession.createProducer(topic); |
| producerConnection.start(); |
| for (int i = 0; i < COUNT; i++) { |
| BytesMessage msg = producerSession.createBytesMessage(); |
| msg.writeBytes(payload); |
| producer.send(msg); |
| if (i != 0 && i % 1000 == 0) { |
| LOG.info("Sent msg " + i); |
| } |
| } |
| producerConnection.close(); |
| broker.stop(); |
| broker = createBroker(false); |
| if (forceRecover) { |
| configurePersistence(broker); |
| } |
| broker.start(); |
| |
| consumerConnection = factory.createConnection(); |
| consumerConnection.setClientID(CONSUMER_NAME); |
| consumerConnection.start(); |
| consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); |
| for (int i = 0; i < COUNT; i++) { |
| Message msg = consumer.receive(10000); |
| assertNotNull("Missing message: " + i, msg); |
| if (i != 0 && i % 1000 == 0) { |
| LOG.info("Received msg " + i); |
| } |
| |
| } |
| consumerConnection.close(); |
| |
| } |
| |
| @Override |
| protected void setUp() throws Exception{ |
| if (broker == null) { |
| broker = createBroker(true); |
| } |
| |
| super.setUp(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception{ |
| super.tearDown(); |
| if (broker != null) { |
| broker.stop(); |
| broker = null; |
| } |
| } |
| |
| protected Topic creatTopic(Session s,String destinationName) throws JMSException{ |
| return s.createTopic(destinationName); |
| } |
| |
| /** |
| * Factory method to create a new broker |
| * |
| * @throws Exception |
| */ |
| protected BrokerService createBroker(boolean deleteStore) throws Exception{ |
| BrokerService answer = new BrokerService(); |
| configureBroker(answer, deleteStore); |
| return answer; |
| } |
| |
| protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{ |
| answer.setDeleteAllMessagesOnStartup(deleteStore); |
| KahaDBStore kaha = new KahaDBStore(); |
| //kaha.setConcurrentStoreAndDispatchTopics(false); |
| File directory = new File("target/activemq-data/kahadb"); |
| if (deleteStore) { |
| IOHelper.deleteChildren(directory); |
| } |
| kaha.setDirectory(directory); |
| //kaha.setMaxAsyncJobs(10); |
| |
| answer.setPersistenceAdapter(kaha); |
| answer.addConnector(bindAddress); |
| answer.setUseShutdownHook(false); |
| answer.setAdvisorySupport(false); |
| answer.setDedicatedTaskRunner(useDedicatedTaskRunner); |
| } |
| |
| protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress); |
| factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner); |
| return factory; |
| } |
| |
| public static Test suite(){ |
| return suite(DurableConsumerTest.class); |
| } |
| |
| public static void main(String[] args){ |
| junit.textui.TestRunner.run(suite()); |
| } |
| } |