| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.usecases; |
| |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.jms.Connection; |
| import javax.jms.DeliveryMode; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.jms.Topic; |
| import javax.jms.TopicSubscriber; |
| import javax.management.ObjectName; |
| |
| import junit.framework.Test; |
| |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.CombinationTestSupport; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.jmx.DestinationViewMBean; |
| import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; |
| import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; |
| import org.apache.activemq.broker.region.policy.PolicyEntry; |
| import org.apache.activemq.broker.region.policy.PolicyMap; |
| import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.util.Wait; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class); |
| |
| private final ActiveMQDestination destination = new ActiveMQQueue("test"); |
| |
| private boolean optimizedDispatch = true; |
| private PendingQueueMessageStoragePolicy pendingQueuePolicy; |
| |
| private BrokerService broker; |
| private String connectionUri; |
| private Connection connection; |
| private Session session; |
| private MessageProducer producer; |
| |
| public static Test suite() { |
| return suite(ExpiredMessagesWithNoConsumerTest.class); |
| } |
| |
| public static void main(String[] args) { |
| junit.textui.TestRunner.run(suite()); |
| } |
| |
| protected void createBrokerWithMemoryLimit() throws Exception { |
| createBrokerWithMemoryLimit(800); |
| } |
| |
| protected void createBrokerWithMemoryLimit(int expireMessagesPeriod) throws Exception { |
| doCreateBroker(true, expireMessagesPeriod); |
| } |
| |
| protected void createBroker() throws Exception { |
| doCreateBroker(false, 800); |
| } |
| |
| private void doCreateBroker(boolean memoryLimit, int expireMessagesPeriod) throws Exception { |
| broker = new BrokerService(); |
| broker.setBrokerName("localhost"); |
| broker.setUseJmx(true); |
| broker.setDeleteAllMessagesOnStartup(true); |
| broker.addConnector("tcp://localhost:0"); |
| |
| PolicyMap policyMap = new PolicyMap(); |
| PolicyEntry defaultEntry = new PolicyEntry(); |
| defaultEntry.setOptimizedDispatch(optimizedDispatch); |
| defaultEntry.setExpireMessagesPeriod(expireMessagesPeriod); |
| defaultEntry.setMaxExpirePageSize(800); |
| |
| defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); |
| |
| if (memoryLimit) { |
| // so memory is not consumed by DLQ turn if off |
| defaultEntry.setDeadLetterStrategy(null); |
| defaultEntry.setMemoryLimit(200 * 1000); |
| } |
| |
| policyMap.setDefaultEntry(defaultEntry); |
| |
| broker.setDestinationPolicy(policyMap); |
| broker.start(); |
| broker.waitUntilStarted(); |
| |
| connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); |
| } |
| |
| public void testExpiredNonPersistentMessagesWithNoConsumer() throws Exception { |
| |
| createBrokerWithMemoryLimit(2000); |
| |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); |
| connection = factory.createConnection(); |
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| producer = session.createProducer(destination); |
| producer.setTimeToLive(1000); |
| producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); |
| connection.start(); |
| final long sendCount = 2000; |
| |
| final Thread producingThread = new Thread("Producing Thread") { |
| @Override |
| public void run() { |
| try { |
| int i = 0; |
| long tStamp = System.currentTimeMillis(); |
| while (i++ < sendCount) { |
| producer.send(session.createTextMessage("test")); |
| if (i%100 == 0) { |
| LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); |
| tStamp = System.currentTimeMillis() ; |
| } |
| |
| if (135 == i) { |
| // allow pending messages to expire, before usage limit kicks in to flush them |
| TimeUnit.SECONDS.sleep(5); |
| } |
| } |
| } catch (Throwable ex) { |
| ex.printStackTrace(); |
| } |
| } |
| }; |
| |
| producingThread.start(); |
| |
| assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| producingThread.join(TimeUnit.SECONDS.toMillis(3000)); |
| return !producingThread.isAlive(); |
| } |
| })); |
| |
| TimeUnit.SECONDS.sleep(5); |
| |
| final DestinationViewMBean view = createView(destination); |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| try { |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| return view.getDequeueCount() != 0 |
| && view.getDequeueCount() == view.getExpiredCount() |
| && view.getDequeueCount() == view.getEnqueueCount() |
| && view.getQueueSize() == 0; |
| } catch (Exception ignored) { |
| LOG.info(ignored.toString()); |
| } |
| return false; |
| } |
| }, Wait.MAX_WAIT_MILLIS * 10); |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage()); |
| assertEquals("0 queue", 0, view.getQueueSize()); |
| } |
| |
| |
| public void initCombosForTestExpiredMessagesWithNoConsumer() { |
| addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE}); |
| addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()}); |
| } |
| |
| public void testExpiredMessagesWithNoConsumer() throws Exception { |
| |
| createBrokerWithMemoryLimit(); |
| |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); |
| connection = factory.createConnection(); |
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| producer = session.createProducer(destination); |
| producer.setTimeToLive(1000); |
| connection.start(); |
| final long sendCount = 2000; |
| |
| final Thread producingThread = new Thread("Producing Thread") { |
| @Override |
| public void run() { |
| try { |
| int i = 0; |
| long tStamp = System.currentTimeMillis(); |
| while (i++ < sendCount) { |
| producer.send(session.createTextMessage("test")); |
| if (i%100 == 0) { |
| LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); |
| tStamp = System.currentTimeMillis() ; |
| } |
| } |
| } catch (Throwable ex) { |
| ex.printStackTrace(); |
| } |
| } |
| }; |
| |
| producingThread.start(); |
| |
| assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| producingThread.join(TimeUnit.SECONDS.toMillis(3000)); |
| return !producingThread.isAlive(); |
| } |
| })); |
| |
| final DestinationViewMBean view = createView(destination); |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| return sendCount == view.getExpiredCount(); |
| } |
| }, Wait.MAX_WAIT_MILLIS * 10); |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| assertEquals("Not all sent messages have expired", sendCount, view.getExpiredCount()); |
| assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage()); |
| } |
| |
| // first ack delivered after expiry |
| public void testExpiredMessagesWithVerySlowConsumer() throws Exception { |
| createBroker(); |
| final long queuePrefetch = 5; |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( |
| connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); |
| connection = factory.createConnection(); |
| session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| producer = session.createProducer(destination); |
| final int ttl = 4000; |
| producer.setTimeToLive(ttl); |
| |
| final long sendCount = 10; |
| final CountDownLatch receivedOneCondition = new CountDownLatch(1); |
| final CountDownLatch waitCondition = new CountDownLatch(1); |
| |
| MessageConsumer consumer = session.createConsumer(destination); |
| consumer.setMessageListener(new MessageListener() { |
| |
| @Override |
| public void onMessage(Message message) { |
| try { |
| LOG.info("Got my message: " + message); |
| receivedOneCondition.countDown(); |
| waitCondition.await(6, TimeUnit.MINUTES); |
| LOG.info("acking message: " + message); |
| message.acknowledge(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(e.toString()); |
| } |
| } |
| }); |
| |
| connection.start(); |
| |
| final Thread producingThread = new Thread("Producing Thread") { |
| @Override |
| public void run() { |
| try { |
| int i = 0; |
| long tStamp = System.currentTimeMillis(); |
| while (i++ < sendCount) { |
| producer.send(session.createTextMessage("test")); |
| if (i%100 == 0) { |
| LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); |
| tStamp = System.currentTimeMillis() ; |
| } |
| } |
| } catch (Throwable ex) { |
| ex.printStackTrace(); |
| } |
| } |
| }; |
| |
| producingThread.start(); |
| assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); |
| |
| assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| producingThread.join(1000); |
| return !producingThread.isAlive(); |
| } |
| }, Wait.MAX_WAIT_MILLIS * 10)); |
| |
| final DestinationViewMBean view = createView(destination); |
| |
| assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return queuePrefetch == view.getDispatchCount(); |
| } |
| })); |
| assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount(); |
| } |
| })); |
| |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| // let the ack happen |
| waitCondition.countDown(); |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return 0 == view.getInFlightCount(); |
| } |
| }); |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| assertEquals("inflight reduced to duck", |
| 0, view.getInFlightCount()); |
| assertEquals("size didn't get back to 0 ", 0, view.getQueueSize()); |
| assertEquals("dequeues didn't match sent/expired ", sendCount, view.getDequeueCount()); |
| |
| consumer.close(); |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return 0 == view.getInFlightCount(); |
| } |
| }); |
| assertEquals("inflight goes to zero on close", 0, view.getInFlightCount()); |
| |
| LOG.info("done: " + getName()); |
| } |
| |
| public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception { |
| createBroker(); |
| final long queuePrefetch = 600; |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( |
| connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); |
| connection = factory.createConnection(); |
| session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| producer = session.createProducer(destination); |
| final int ttl = 4000; |
| producer.setTimeToLive(ttl); |
| |
| final long sendCount = 1500; |
| final CountDownLatch receivedOneCondition = new CountDownLatch(1); |
| final CountDownLatch waitCondition = new CountDownLatch(1); |
| final AtomicLong received = new AtomicLong(); |
| MessageConsumer consumer = session.createConsumer(destination); |
| consumer.setMessageListener(new MessageListener() { |
| |
| @Override |
| public void onMessage(Message message) { |
| try { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Got my message: " + message); |
| } |
| receivedOneCondition.countDown(); |
| received.incrementAndGet(); |
| waitCondition.await(5, TimeUnit.MINUTES); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("acking message: " + message); |
| } |
| message.acknowledge(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(e.toString()); |
| } |
| } |
| }); |
| |
| connection.start(); |
| |
| final Thread producingThread = new Thread("Producing Thread") { |
| @Override |
| public void run() { |
| try { |
| int i = 0; |
| long tStamp = System.currentTimeMillis(); |
| while (i++ < sendCount) { |
| producer.send(session.createTextMessage("test")); |
| if (i%100 == 0) { |
| LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); |
| tStamp = System.currentTimeMillis() ; |
| } |
| } |
| } catch (Throwable ex) { |
| ex.printStackTrace(); |
| } |
| } |
| }; |
| |
| producingThread.start(); |
| assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); |
| |
| assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| producingThread.join(1000); |
| return !producingThread.isAlive(); |
| } |
| }, Wait.MAX_WAIT_MILLIS * 10)); |
| |
| final DestinationViewMBean view = createView(destination); |
| |
| assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return queuePrefetch == view.getDispatchCount(); |
| } |
| })); |
| |
| assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount(); |
| } |
| })); |
| |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| // let the ack happen |
| waitCondition.countDown(); |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return 0 == view.getInFlightCount(); |
| } |
| }); |
| LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() |
| + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() |
| + ", size= " + view.getQueueSize()); |
| |
| assertEquals("inflight didn't reduce to duck", |
| 0, view.getInFlightCount()); |
| assertEquals("size doesn't get back to 0 ", 0, view.getQueueSize()); |
| assertEquals("dequeues don't match sent/expired ", sendCount, view.getDequeueCount()); |
| |
| // produce some more |
| producer.setTimeToLive(0); |
| long tStamp = System.currentTimeMillis(); |
| for (int i=0; i<sendCount; i++) { |
| producer.send(session.createTextMessage("test-" + i)); |
| if (i%100 == 0) { |
| LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); |
| tStamp = System.currentTimeMillis() ; |
| } |
| } |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return received.get() >= sendCount; |
| } |
| }); |
| |
| consumer.close(); |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return 0 == view.getInFlightCount(); |
| } |
| }); |
| assertEquals("inflight did not go to zero on close", 0, view.getInFlightCount()); |
| |
| LOG.info("done: " + getName()); |
| } |
| |
| public void testExpireMessagesForDurableSubscriber() throws Exception { |
| createBroker(); |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); |
| connection = factory.createConnection(); |
| connection.setClientID("myConnection"); |
| session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| connection.start(); |
| Topic destination = session.createTopic("test"); |
| producer = session.createProducer(destination); |
| final int ttl = 1000; |
| producer.setTimeToLive(ttl); |
| |
| final long sendCount = 10; |
| |
| TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub"); |
| sub.close(); |
| |
| for (int i=0; i < sendCount; i++) { |
| producer.send(session.createTextMessage("test")); |
| } |
| |
| DestinationViewMBean view = createView((ActiveMQTopic)destination); |
| |
| LOG.info("messages sent"); |
| LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); |
| assertEquals(0, view.getExpiredCount()); |
| assertEquals(10, view.getEnqueueCount()); |
| |
| Thread.sleep(5000); |
| |
| LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); |
| assertEquals(10, view.getExpiredCount()); |
| assertEquals(10, view.getEnqueueCount()); |
| |
| final AtomicLong received = new AtomicLong(); |
| sub = session.createDurableSubscriber(destination, "mySub"); |
| sub.setMessageListener(new MessageListener() { |
| @Override |
| public void onMessage(Message message) { |
| received.incrementAndGet(); |
| } |
| }); |
| |
| LOG.info("Waiting for messages to arrive"); |
| |
| Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisified() throws Exception { |
| return received.get() >= sendCount; |
| } |
| }, 1000); |
| |
| LOG.info("received=" + received.get()); |
| LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); |
| |
| assertEquals(0, received.get()); |
| assertEquals(10, view.getExpiredCount()); |
| assertEquals(10, view.getEnqueueCount()); |
| } |
| |
| protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { |
| String domain = "org.apache.activemq"; |
| ObjectName name; |
| if (destination.isQueue()) { |
| name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test"); |
| } else { |
| name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"); |
| } |
| |
| return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| connection.stop(); |
| broker.stop(); |
| broker.waitUntilStopped(); |
| } |
| |
| public boolean getOptimizedDispatch() { |
| return this.optimizedDispatch; |
| } |
| |
| public void setOptimizedDispatch(boolean option) { |
| this.optimizedDispatch = option; |
| } |
| |
| public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { |
| return this.pendingQueuePolicy; |
| } |
| |
| public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) { |
| this.pendingQueuePolicy = policy; |
| } |
| } |